Skip to content

Commit

Permalink
BucketApply in-order iteration for offers
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Feb 9, 2024
1 parent 6dea1f5 commit 98229f3
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 50 deletions.
4 changes: 3 additions & 1 deletion src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,15 @@ Bucket::apply(Application& app) const
{
ZoneScoped;

UnorderedSet<LedgerKey> emptySet;
BucketApplicator applicator(
app, app.getConfig().LEDGER_PROTOCOL_VERSION,
0 /*set to 0 so we always load from the parent to check state*/,
0 /*set to a level that's not the bottom so we don't treat live entries
as init*/
,
shared_from_this(), [](LedgerEntryType) { return true; });
shared_from_this(), [](LedgerEntryType) { return true; },
/*newOffersOnly=*/app.getConfig().isUsingBucketListDB(), emptySet);
BucketApplicator::Counters counters(app.getClock().now());
while (applicator)
{
Expand Down
45 changes: 43 additions & 2 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ BucketApplicator::BucketApplicator(Application& app,
uint32_t minProtocolVersionSeen,
uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter)
std::function<bool(LedgerEntryType)> filter,
bool newOffersOnly,
UnorderedSet<LedgerKey> seenKeys)
: mApp(app)
, mMaxProtocolVersion(maxProtocolVersion)
, mMinProtocolVersionSeen(minProtocolVersionSeen)
, mLevel(level)
, mBucketIter(bucket)
, mEntryTypeFilter(filter)
, mNewOffersOnly(newOffersOnly)
, mSeenKeys(seenKeys)
{
auto protocolVersion = mBucketIter.getMetadata().ledgerVersion;
if (protocolVersion > mMaxProtocolVersion)
Expand All @@ -37,11 +41,20 @@ BucketApplicator::BucketApplicator(Application& app,
"bucket protocol version {:d} exceeds maxProtocolVersion {:d}"),
protocolVersion, mMaxProtocolVersion));
}

if (newOffersOnly)
{
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
auto [lowOffset, highOffset] = bucket->getOfferRange();
mBucketIter.seek(lowOffset);
mUpperBoundOffset = highOffset;
}
}

BucketApplicator::operator bool() const
{
return (bool)mBucketIter;
return static_cast<bool>(mBucketIter) &&
(!mNewOffersOnly || mOffersRemaining);
}

size_t
Expand Down Expand Up @@ -99,11 +112,38 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)

for (; mBucketIter; ++mBucketIter)
{
if (mNewOffersOnly && mBucketIter.pos() >= mUpperBoundOffset)
{
mOffersRemaining = false;
break;
}

BucketEntry const& e = *mBucketIter;
Bucket::checkProtocolLegality(e, mMaxProtocolVersion);

if (shouldApplyEntry(mEntryTypeFilter, e))
{
if (mNewOffersOnly)
{
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
{
auto [_, wasInserted] =
mSeenKeys.emplace(LedgerEntryKey(e.liveEntry()));

// Skip seen keys
if (!wasInserted)
{
continue;
}
}
else
{
// Only apply INIT and LIVE entries
mSeenKeys.emplace(e.deadEntry());
continue;
}
}

counters.mark(e);

if (e.type() == LIVEENTRY || e.type() == INITENTRY)
Expand Down Expand Up @@ -148,6 +188,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
}
else
{
releaseAssertOrThrow(!mNewOffersOnly);
if (protocolVersionIsBefore(
mMinProtocolVersionSeen,
Bucket::
Expand Down
12 changes: 11 additions & 1 deletion src/bucket/BucketApplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class BucketApplicator
BucketInputIterator mBucketIter;
size_t mCount{0};
std::function<bool(LedgerEntryType)> mEntryTypeFilter;
bool const mNewOffersOnly;
UnorderedSet<LedgerKey>& mSeenKeys;
std::streamoff mUpperBoundOffset;
bool mOffersRemaining{true};

public:
class Counters
Expand Down Expand Up @@ -63,10 +67,16 @@ class BucketApplicator
VirtualClock::time_point now);
};

// If newOffersOnly is true, only offers are applied. Additionally, the
// offer is only applied iff:
// 1. They are of type INITENTRY or LIVEENTRY
// 2. The LedgerKey is not in seenKeys
// When this flag is set, each offer key read is added to seenKeys
BucketApplicator(Application& app, uint32_t maxProtocolVersion,
uint32_t minProtocolVersionSeen, uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter);
std::function<bool(LedgerEntryType)> filter,
bool newOffersOnly, UnorderedSet<LedgerKey> seenKeys);
operator bool() const;
size_t advance(Counters& counters);

Expand Down
7 changes: 7 additions & 0 deletions src/bucket/BucketInputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,11 @@ BucketInputIterator::operator++()
}
return *this;
}

void
BucketInputIterator::seek(std::streamoff offset)
{
mIn.seek(offset);
loadEntry();
}
}
1 change: 1 addition & 0 deletions src/bucket/BucketInputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ class BucketInputIterator

size_t pos();
size_t size() const;
void seek(std::streamoff offset);
};
}
158 changes: 116 additions & 42 deletions src/catchup/ApplyBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,31 @@ class TempLedgerVersionSetter : NonMovableOrCopyable
}
};

uint32_t
ApplyBucketsWork::startingLevel(bool offersOnly)
{
return offersOnly ? 0 : BucketList::kNumLevels - 1;
}

bool
ApplyBucketsWork::appliedAllLevels() const
{
if (mOffersOnly)
{
return mLevel == BucketList::kNumLevels - 1;
}
else
{
return mLevel == 0;
}
}

uint32_t
ApplyBucketsWork::nextLevel() const
{
return mOffersOnly ? mLevel + 1 : mLevel - 1;
}

ApplyBucketsWork::ApplyBucketsWork(
Application& app,
std::map<std::string, std::shared_ptr<Bucket>> const& buckets,
Expand All @@ -61,7 +86,8 @@ ApplyBucketsWork::ApplyBucketsWork(
, mEntryTypeFilter(onlyApply)
, mApplying(false)
, mTotalSize(0)
, mLevel(BucketList::kNumLevels - 1)
, mOffersOnly(app.getConfig().isUsingBucketListDB())
, mLevel(startingLevel(mOffersOnly))
, mMaxProtocolVersion(maxProtocolVersion)
, mCounters(app.getClock().now())
{
Expand Down Expand Up @@ -107,6 +133,15 @@ ApplyBucketsWork::doReset()
mLastAppliedSizeMb = 0;
mLastPos = 0;
mMinProtocolVersionSeen = UINT32_MAX;
mSeenKeys.clear();

if (mOffersOnly)
{
// The current size of this set is 1.6 million during BucketApply (as of
// 12/20/23). There's not a great way to estimate this, so reserving
// with some extra wiggle room
mSeenKeys.reserve(2'000'000);
}

if (!isAborting())
{
Expand Down Expand Up @@ -153,17 +188,31 @@ ApplyBucketsWork::doReset()
mApp.getLedgerTxnRoot().prepareNewObjects(totalLECount);
}

mLevel = BucketList::kNumLevels - 1;
mLevel = startingLevel(mOffersOnly);
mApplying = false;
mDelayChecked = false;
mSpawnedAssumeStateWork = false;

mSnapBucket.reset();
mCurrBucket.reset();
mSnapApplicator.reset();
mCurrApplicator.reset();
mFirstBucket.reset();
mSecondBucket.reset();
mFirstBucketApplicator.reset();
mSecondBucketApplicator.reset();
}

// We iterate through the BucketList either in-order (level 0 curr, level 0
// snap, level 1 curr, etc) when only applying offers, or in reverse order
// (level 9 curr, level 8 snap, level 8 curr, etc) when applying all entry
// types. When only applying offers, we keep track of the keys we have already
// seen, and only apply an entry to the DB if it has not been seen before. This
// allows us to perform a single write to the DB and ensure that only the newest
// version is written.
//
// When applying all entry types, this seen keys set would be too large. Since
// there can be no seen keys set, if we were to apply every entry in order, we
// would overwrite the newest version of an entry with an older version as we
// iterate through the BucketList. Due to this, we iterate in reverse order such
// that the newest version of a key is written last, overwriting the older
// versions. This is much slower due to DB churn.
void
ApplyBucketsWork::startLevel()
{
Expand All @@ -174,31 +223,54 @@ ApplyBucketsWork::startLevel()
auto& level = getBucketLevel(mLevel);
HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel);

bool applySnap = (i.snap != binToHex(level.getSnap()->getHash()));
bool applyCurr = (i.curr != binToHex(level.getCurr()->getHash()));
bool applyFirst = mOffersOnly
? (i.curr != binToHex(level.getCurr()->getHash()))
: (i.snap != binToHex(level.getSnap()->getHash()));
bool applySecond = mOffersOnly
? (i.snap != binToHex(level.getSnap()->getHash()))
: (i.curr != binToHex(level.getCurr()->getHash()));

if (mApplying || applySnap)
if (mApplying || applyFirst)
{
mSnapBucket = getBucket(i.snap);
mFirstBucket = getBucket(mOffersOnly ? i.curr : i.snap);
mMinProtocolVersionSeen = std::min(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mSnapBucket));
mSnapApplicator = std::make_unique<BucketApplicator>(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mFirstBucket));
mFirstBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel,
mSnapBucket, mEntryTypeFilter);
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}",
mLevel, i.snap);
mFirstBucket, mEntryTypeFilter, mOffersOnly, mSeenKeys);

if (mOffersOnly)
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}",
mLevel, i.curr);
}
else
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}",
mLevel, i.snap);
}

mApplying = true;
}
if (mApplying || applyCurr)
if (mApplying || applySecond)
{
mCurrBucket = getBucket(i.curr);
mSecondBucket = getBucket(mOffersOnly ? i.snap : i.curr);
mMinProtocolVersionSeen = std::min(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mCurrBucket));
mCurrApplicator = std::make_unique<BucketApplicator>(
mMinProtocolVersionSeen, Bucket::getBucketVersion(mSecondBucket));
mSecondBucketApplicator = std::make_unique<BucketApplicator>(
mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel,
mCurrBucket, mEntryTypeFilter);
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}",
mLevel, i.curr);
mSecondBucket, mEntryTypeFilter, mOffersOnly, mSeenKeys);

if (mOffersOnly)
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}",
mLevel, i.snap);
}
else
{
CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}",
mLevel, i.curr);
}
mApplying = true;
}
}
Expand Down Expand Up @@ -232,45 +304,47 @@ ApplyBucketsWork::doWork()
}

// The structure of these if statements is motivated by the following:
// 1. mCurrApplicator should never be advanced if mSnapApplicator is
// not false. Otherwise it is possible for curr to modify the
// database when the invariants for snap are checked.
// 2. There is no reason to advance mSnapApplicator or mCurrApplicator
// if there is nothing to be applied.
if (mSnapApplicator)
// 1. mSecondBucketApplicator should never be advanced if
// mFirstBucketApplicator is not false. Otherwise it is possible for
// second bucket to modify the database when the invariants for first
// bucket are checked.
// 2. There is no reason to advance mFirstBucketApplicator or
// mSecondBucketApplicator if there is nothing to be applied.
if (mFirstBucketApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mSnapApplicator)
if (*mFirstBucketApplicator)
{
advance("snap", *mSnapApplicator);
advance(mOffersOnly ? "curr" : "snap", *mFirstBucketApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mSnapBucket, mApplyState.currentLedger, mLevel, false,
mFirstBucket, mApplyState.currentLedger, mLevel, false,
mEntryTypeFilter);
mSnapApplicator.reset();
mSnapBucket.reset();
mFirstBucketApplicator.reset();
mFirstBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}
if (mCurrApplicator)
if (mSecondBucketApplicator)
{
TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion);
if (*mCurrApplicator)
if (*mSecondBucketApplicator)
{
advance("curr", *mCurrApplicator);
advance(mOffersOnly ? "snap" : "curr",
*mSecondBucketApplicator);
return State::WORK_RUNNING;
}
mApp.getInvariantManager().checkOnBucketApply(
mCurrBucket, mApplyState.currentLedger, mLevel, true,
mSecondBucket, mApplyState.currentLedger, mLevel, true,
mEntryTypeFilter);
mCurrApplicator.reset();
mCurrBucket.reset();
mSecondBucketApplicator.reset();
mSecondBucket.reset();
mApp.getCatchupManager().bucketsApplied();
}

if (mLevel != 0)
if (!appliedAllLevels())
{
--mLevel;
mLevel = nextLevel();
CLOG_DEBUG(History, "ApplyBuckets : starting next level: {}",
mLevel);
return State::WORK_RUNNING;
Expand Down Expand Up @@ -332,7 +406,7 @@ ApplyBucketsWork::advance(std::string const& bucketName,
bool
ApplyBucketsWork::isLevelComplete()
{
return !(mApplying) || !(mSnapApplicator || mCurrApplicator);
return !(mApplying) || !(mFirstBucketApplicator || mSecondBucketApplicator);
}

std::string
Expand Down
Loading

0 comments on commit 98229f3

Please sign in to comment.