Skip to content

Commit

Permalink
Added indexing to ApplyBucketsWork and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Feb 9, 2024
1 parent 98229f3 commit 4e16098
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ BucketApplicator::BucketApplicator(Application& app,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter,
bool newOffersOnly,
UnorderedSet<LedgerKey> seenKeys)
UnorderedSet<LedgerKey>& seenKeys)
: mApp(app)
, mMaxProtocolVersion(maxProtocolVersion)
, mMinProtocolVersionSeen(minProtocolVersionSeen)
Expand All @@ -42,7 +42,7 @@ BucketApplicator::BucketApplicator(Application& app,
protocolVersion, mMaxProtocolVersion));
}

if (newOffersOnly)
if (newOffersOnly && !bucket->isEmpty())
{
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
auto [lowOffset, highOffset] = bucket->getOfferRange();
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketApplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class BucketApplicator
uint32_t minProtocolVersionSeen, uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter,
bool newOffersOnly, UnorderedSet<LedgerKey> seenKeys);
bool newOffersOnly, UnorderedSet<LedgerKey>& seenKeys);
operator bool() const;
size_t advance(Counters& counters);

Expand Down
68 changes: 52 additions & 16 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "bucket/BucketManager.h"
#include "catchup/AssumeStateWork.h"
#include "catchup/CatchupManager.h"
#include "catchup/IndexBucketsWork.h"
#include "crypto/Hex.h"
#include "crypto/SecretKey.h"
#include "historywork/Progress.h"
Expand Down Expand Up @@ -108,7 +109,7 @@ ApplyBucketsWork::getBucketLevel(uint32_t level)
return mApp.getBucketManager().getBucketList().getLevel(level);
}

std::shared_ptr<Bucket const>
std::shared_ptr<Bucket>
ApplyBucketsWork::getBucket(std::string const& hash)
{
auto i = mBuckets.find(hash);
Expand All @@ -134,17 +135,18 @@ ApplyBucketsWork::doReset()
mLastPos = 0;
mMinProtocolVersionSeen = UINT32_MAX;
mSeenKeys.clear();

if (mOffersOnly)
{
// The current size of this set is 1.6 million during BucketApply (as of
// 12/20/23). There's not a great way to estimate this, so reserving
// with some extra wiggle room
mSeenKeys.reserve(2'000'000);
}
mBucketsToIndex.clear();

if (!isAborting())
{
if (mOffersOnly)
{
// The current size of this set is 1.6 million during BucketApply
// (as of 12/20/23). There's not a great way to estimate this, so
// reserving with some extra wiggle room
mSeenKeys.reserve(2'000'000);
}

// When applying buckets with accounts, we have to make sure that the
// root account has been removed. This comes into play, for example,
// when applying buckets from genesis the root account already exists.
Expand All @@ -164,12 +166,17 @@ ApplyBucketsWork::doReset()
}
}

auto addBucket = [this](std::shared_ptr<Bucket const> const& bucket) {
auto addBucket = [this](std::shared_ptr<Bucket> const& bucket) {
if (bucket->getSize() > 0)
{
mTotalBuckets++;
mTotalSize += bucket->getSize();
}

if (mOffersOnly)
{
mBucketsToIndex.emplace_back(bucket);
}
};

for (auto const& hsb : mApplyState.currentBuckets)
Expand Down Expand Up @@ -280,7 +287,34 @@ ApplyBucketsWork::doWork()
{
ZoneScoped;

// Step 1: apply buckets. Step 2: assume state
// Step 1: index buckets. Step 2: apply buckets. Step 3: assume state
if (!mSpawnedIndexBucketsWork)
{
if (mOffersOnly)
{
addWork<IndexBucketsWork>(mBucketsToIndex);
}
else
{
mFinishedIndexBucketsWork = true;
}

mSpawnedIndexBucketsWork = true;
}

if (!mFinishedIndexBucketsWork)
{
auto status = checkChildrenStatus();
if (status == BasicWork::State::WORK_SUCCESS)
{
mFinishedIndexBucketsWork = true;
}
else
{
return status;
}
}

if (!mSpawnedAssumeStateWork)
{
if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked)
Expand Down Expand Up @@ -319,8 +353,8 @@ ApplyBucketsWork::doWork()
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mFirstBucket, mApplyState.currentLedger, mLevel, false,
mEntryTypeFilter);
mFirstBucket, mApplyState.currentLedger, mLevel,
/*isCurr=*/mOffersOnly, mEntryTypeFilter);
mFirstBucketApplicator.reset();
mFirstBucket.reset();
mApp.getCatchupManager().bucketsApplied();
Expand All @@ -335,8 +369,8 @@ ApplyBucketsWork::doWork()
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSecondBucket, mApplyState.currentLedger, mLevel, true,
mEntryTypeFilter);
mSecondBucket, mApplyState.currentLedger, mLevel,
/*isCurr=*/!mOffersOnly, mEntryTypeFilter);
mSecondBucketApplicator.reset();
mSecondBucket.reset();
mApp.getCatchupManager().bucketsApplied();
Expand Down Expand Up @@ -412,7 +446,9 @@ ApplyBucketsWork::isLevelComplete()
std::string
ApplyBucketsWork::getStatus() const
{
if (!mSpawnedAssumeStateWork)
// This status string only applies to step 2 when we actually apply the
// buckets.
if (mFinishedIndexBucketsWork && !mSpawnedAssumeStateWork)
{
auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize);
return fmt::format(
Expand Down
5 changes: 4 additions & 1 deletion src/catchup/ApplyBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class ApplyBucketsWork : public Work

bool mApplying{false};
bool mSpawnedAssumeStateWork{false};
bool mSpawnedIndexBucketsWork{false};
bool mFinishedIndexBucketsWork{false};
size_t mTotalBuckets{0};
size_t mAppliedBuckets{0};
size_t mAppliedEntries{0};
Expand All @@ -40,11 +42,12 @@ class ApplyBucketsWork : public Work
std::unique_ptr<BucketApplicator> mFirstBucketApplicator;
std::unique_ptr<BucketApplicator> mSecondBucketApplicator;
UnorderedSet<LedgerKey> mSeenKeys;
std::vector<std::shared_ptr<Bucket>> mBucketsToIndex;

BucketApplicator::Counters mCounters;

void advance(std::string const& name, BucketApplicator& applicator);
std::shared_ptr<Bucket const> getBucket(std::string const& bucketHash);
std::shared_ptr<Bucket> getBucket(std::string const& bucketHash);
BucketLevel& getBucketLevel(uint32_t level);
void startLevel();
bool isLevelComplete();
Expand Down

0 comments on commit 4e16098

Please sign in to comment.