Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Apr 18, 2024
1 parent ce031d1 commit 574f14f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 67 deletions.
5 changes: 2 additions & 3 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,14 @@ Bucket::apply(Application& app) const
{
ZoneScoped;

UnorderedSet<LedgerKey> emptySet;
std::unordered_set<LedgerKey> emptySet;
BucketApplicator applicator(
app, app.getConfig().LEDGER_PROTOCOL_VERSION,
0 /*set to 0 so we always load from the parent to check state*/,
0 /*set to a level that's not the bottom so we don't treat live entries
as init*/
,
shared_from_this(), [](LedgerEntryType) { return true; },
/*newOffersOnly=*/app.getConfig().isUsingBucketListDB(), emptySet);
shared_from_this(), [](LedgerEntryType) { return true; }, emptySet);
BucketApplicator::Counters counters(app.getClock().now());
while (applicator)
{
Expand Down
21 changes: 12 additions & 9 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ BucketApplicator::BucketApplicator(Application& app,
uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter,
bool newOffersOnly,
UnorderedSet<LedgerKey>& seenKeys)
std::unordered_set<LedgerKey>& seenKeys)
: mApp(app)
, mMaxProtocolVersion(maxProtocolVersion)
, mMinProtocolVersionSeen(minProtocolVersionSeen)
, mLevel(level)
, mBucketIter(bucket)
, mEntryTypeFilter(filter)
, mNewOffersOnly(newOffersOnly)
, mSeenKeys(seenKeys)
{
auto protocolVersion = mBucketIter.getMetadata().ledgerVersion;
Expand All @@ -42,9 +40,9 @@ BucketApplicator::BucketApplicator(Application& app,
protocolVersion, mMaxProtocolVersion));
}

if (newOffersOnly && !bucket->isEmpty())
// Only apply offers if BucketListDB is enabled
if (mApp.getConfig().isUsingBucketListDB() && !bucket->isEmpty())
{
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
auto offsetOp = bucket->getOfferRange();
if (offsetOp)
{
Expand All @@ -62,8 +60,12 @@ BucketApplicator::BucketApplicator(Application& app,

BucketApplicator::operator bool() const
{
// There is more work to do (i.e. (bool) *this == true) iff:
// 1. The underlying bucket iterator is not EOF and
// 2. Either BucketListDB is not enabled (so we must apply all entry types)
// or BucketListDB is enabled and we have offers still remaining.
return static_cast<bool>(mBucketIter) &&
(!mNewOffersOnly || mOffersRemaining);
(!mApp.getConfig().isUsingBucketListDB() || mOffersRemaining);
}

size_t
Expand Down Expand Up @@ -125,7 +127,8 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
// returns the file offset at the end of the currently loaded entry.
// This means we must read until pos is strictly greater than the upper
// bound so that we don't skip the last offer in the range.
if (mNewOffersOnly && mBucketIter.pos() > mUpperBoundOffset)
auto isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();
if (isUsingBucketListDB && mBucketIter.pos() > mUpperBoundOffset)
{
mOffersRemaining = false;
break;
Expand All @@ -136,7 +139,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)

if (shouldApplyEntry(mEntryTypeFilter, e))
{
if (mNewOffersOnly)
if (isUsingBucketListDB)
{
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
{
Expand Down Expand Up @@ -201,7 +204,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
}
else
{
releaseAssertOrThrow(!mNewOffersOnly);
releaseAssertOrThrow(!isUsingBucketListDB);
if (protocolVersionIsBefore(
mMinProtocolVersionSeen,
Bucket::
Expand Down
6 changes: 3 additions & 3 deletions src/bucket/BucketApplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "bucket/Bucket.h"
#include "bucket/BucketInputIterator.h"
#include "ledger/LedgerHashUtils.h"
#include "util/Timer.h"
#include "util/XDRStream.h"
#include <memory>
Expand All @@ -28,8 +29,7 @@ class BucketApplicator
BucketInputIterator mBucketIter;
size_t mCount{0};
std::function<bool(LedgerEntryType)> mEntryTypeFilter;
bool const mNewOffersOnly;
UnorderedSet<LedgerKey>& mSeenKeys;
std::unordered_set<LedgerKey>& mSeenKeys;
std::streamoff mUpperBoundOffset;
bool mOffersRemaining{true};

Expand Down Expand Up @@ -76,7 +76,7 @@ class BucketApplicator
uint32_t minProtocolVersionSeen, uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter,
bool newOffersOnly, UnorderedSet<LedgerKey>& seenKeys);
std::unordered_set<LedgerKey>& seenKeys);
operator bool() const;
size_t advance(Counters& counters);

Expand Down
96 changes: 49 additions & 47 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ class TempLedgerVersionSetter : NonMovableOrCopyable
};

uint32_t
ApplyBucketsWork::startingLevel(bool offersOnly)
ApplyBucketsWork::startingLevel()
{
return offersOnly ? 0 : BucketList::kNumLevels - 1;
return mApp.getConfig().isUsingBucketListDB() ? 0
: BucketList::kNumLevels - 1;
}

bool
ApplyBucketsWork::appliedAllLevels() const
{
if (mOffersOnly)
if (mApp.getConfig().isUsingBucketListDB())
{
return mLevel == BucketList::kNumLevels - 1;
}
Expand All @@ -73,7 +74,7 @@ ApplyBucketsWork::appliedAllLevels() const
uint32_t
ApplyBucketsWork::nextLevel() const
{
return mOffersOnly ? mLevel + 1 : mLevel - 1;
return mApp.getConfig().isUsingBucketListDB() ? mLevel + 1 : mLevel - 1;
}

ApplyBucketsWork::ApplyBucketsWork(
Expand All @@ -87,8 +88,7 @@ ApplyBucketsWork::ApplyBucketsWork(
, mEntryTypeFilter(onlyApply)
, mApplying(false)
, mTotalSize(0)
, mOffersOnly(app.getConfig().isUsingBucketListDB())
, mLevel(startingLevel(mOffersOnly))
, mLevel(startingLevel())
, mMaxProtocolVersion(maxProtocolVersion)
, mCounters(app.getClock().now())
{
Expand Down Expand Up @@ -139,7 +139,7 @@ ApplyBucketsWork::doReset()

if (!isAborting())
{
if (mOffersOnly)
if (mApp.getConfig().isUsingBucketListDB())
{
// The current size of this set is 1.6 million during BucketApply
// (as of 12/20/23). There's not a great way to estimate this, so
Expand Down Expand Up @@ -173,7 +173,7 @@ ApplyBucketsWork::doReset()
mTotalSize += bucket->getSize();
}

if (mOffersOnly)
if (mApp.getConfig().isUsingBucketListDB())
{
mBucketsToIndex.emplace_back(bucket);
}
Expand All @@ -195,10 +195,12 @@ ApplyBucketsWork::doReset()
mApp.getLedgerTxnRoot().prepareNewObjects(totalLECount);
}

mLevel = startingLevel(mOffersOnly);
mLevel = startingLevel();
mApplying = false;
mDelayChecked = false;
mSpawnedAssumeStateWork = false;

mIndexBucketsWork.reset();
mAssumeStateWork.reset();

mFirstBucket.reset();
mSecondBucket.reset();
Expand Down Expand Up @@ -229,24 +231,25 @@ ApplyBucketsWork::startLevel()
CLOG_DEBUG(History, "ApplyBuckets : starting level {}", mLevel);
auto& level = getBucketLevel(mLevel);
HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel);
bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();

bool applyFirst = mOffersOnly
bool applyFirst = isUsingBucketListDB
? (i.curr != binToHex(level.getCurr()->getHash()))
: (i.snap != binToHex(level.getSnap()->getHash()));
bool applySecond = mOffersOnly
bool applySecond = isUsingBucketListDB
? (i.snap != binToHex(level.getSnap()->getHash()))
: (i.curr != binToHex(level.getCurr()->getHash()));

if (mApplying || applyFirst)
{
mFirstBucket = getBucket(mOffersOnly ? i.curr : i.snap);
mFirstBucket = getBucket(isUsingBucketListDB ? i.curr : i.snap);
mMinProtocolVersionSeen = std::min(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mFirstBucket));
mFirstBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel,
mFirstBucket, mEntryTypeFilter, mOffersOnly, mSeenKeys);
mFirstBucket, mEntryTypeFilter, mSeenKeys);

if (mOffersOnly)
if (isUsingBucketListDB)
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}",
mLevel, i.curr);
Expand All @@ -261,14 +264,14 @@ ApplyBucketsWork::startLevel()
}
if (mApplying || applySecond)
{
mSecondBucket = getBucket(mOffersOnly ? i.snap : i.curr);
mSecondBucket = getBucket(isUsingBucketListDB ? i.snap : i.curr);
mMinProtocolVersionSeen = std::min(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mSecondBucket));
mSecondBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel,
mSecondBucket, mEntryTypeFilter, mOffersOnly, mSeenKeys);
mSecondBucket, mEntryTypeFilter, mSeenKeys);

if (mOffersOnly)
if (isUsingBucketListDB)
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}",
mLevel, i.snap);
Expand All @@ -288,34 +291,26 @@ ApplyBucketsWork::doWork()
ZoneScoped;

// Step 1: index buckets. Step 2: apply buckets. Step 3: assume state
if (!mSpawnedIndexBucketsWork)
bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();
if (isUsingBucketListDB)
{
if (mOffersOnly)
{
addWork<IndexBucketsWork>(mBucketsToIndex);
}
else
if (!mIndexBucketsWork)
{
mFinishedIndexBucketsWork = true;
mIndexBucketsWork = addWork<IndexBucketsWork>(mBucketsToIndex);
}

mSpawnedIndexBucketsWork = true;
}

if (!mFinishedIndexBucketsWork)
{
// If indexing has not finished or finished and failed, return result
// status. If indexing finished and succeeded, move on and spawn assume
// state work.
auto status = checkChildrenStatus();
if (status == BasicWork::State::WORK_SUCCESS)
{
mFinishedIndexBucketsWork = true;
}
else
if (!mIndexBucketsWork->isDone() ||
status != BasicWork::State::WORK_SUCCESS)
{
return status;
}
}

if (!mSpawnedAssumeStateWork)
if (!mAssumeStateWork)
{
if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked)
{
Expand Down Expand Up @@ -347,30 +342,35 @@ ApplyBucketsWork::doWork()
if (mFirstBucketApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);

// When BucketListDB is enabled, we apply in order starting with
// curr. If BucketListDB is not enabled, we iterate in reverse
// starting with snap.
bool isCurr = isUsingBucketListDB;
if (*mFirstBucketApplicator)
{
advance(mOffersOnly ? "curr" : "snap", *mFirstBucketApplicator);
advance(isCurr ? "curr" : "snap", *mFirstBucketApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mFirstBucket, mApplyState.currentLedger, mLevel,
/*isCurr=*/mOffersOnly, mEntryTypeFilter);
mFirstBucket, mApplyState.currentLedger, mLevel, isCurr,
mEntryTypeFilter);
mFirstBucketApplicator.reset();
mFirstBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}
if (mSecondBucketApplicator)
{
bool isCurr = !isUsingBucketListDB;
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mSecondBucketApplicator)
{
advance(mOffersOnly ? "snap" : "curr",
*mSecondBucketApplicator);
advance(isCurr ? "curr" : "snap", *mSecondBucketApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSecondBucket, mApplyState.currentLedger, mLevel,
/*isCurr=*/!mOffersOnly, mEntryTypeFilter);
mSecondBucket, mApplyState.currentLedger, mLevel, isCurr,
mEntryTypeFilter);
mSecondBucketApplicator.reset();
mSecondBucket.reset();
mApp.getCatchupManager().bucketsApplied();
Expand All @@ -387,9 +387,9 @@ ApplyBucketsWork::doWork()
CLOG_INFO(History, "ApplyBuckets : done, assuming state");

// After all buckets applied, spawn assumeState work
addWork<AssumeStateWork>(mApplyState, mMaxProtocolVersion,
/* restartMerges */ true);
mSpawnedAssumeStateWork = true;
mAssumeStateWork =
addWork<AssumeStateWork>(mApplyState, mMaxProtocolVersion,
/* restartMerges */ true);
}

return checkChildrenStatus();
Expand Down Expand Up @@ -449,7 +449,9 @@ ApplyBucketsWork::getStatus() const
{
// This status string only applies to step 2 when we actually apply the
// buckets.
if (mFinishedIndexBucketsWork && !mSpawnedAssumeStateWork)
bool doneIndexing = !mApp.getConfig().isUsingBucketListDB() ||
(mIndexBucketsWork && mIndexBucketsWork->isDone());
if (doneIndexing && !mSpawnedAssumeStateWork)
{
auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize);
return fmt::format(
Expand Down
Loading

0 comments on commit 574f14f

Please sign in to comment.