Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BucketListDB Bucket Apply Optimization #4114

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ Bucket::isIndexed() const
return static_cast<bool>(mIndex);
}

std::optional<std::pair<std::streamoff, std::streamoff>>
Bucket::getOfferRange() const
{
return getIndex().getOfferRange();
}

void
Bucket::setIndex(std::unique_ptr<BucketIndex const>&& index)
{
Expand Down Expand Up @@ -135,13 +141,14 @@ Bucket::apply(Application& app) const
{
ZoneScoped;

std::unordered_set<LedgerKey> emptySet;
BucketApplicator applicator(
app, app.getConfig().LEDGER_PROTOCOL_VERSION,
0 /*set to 0 so we always load from the parent to check state*/,
0 /*set to a level that's not the bottom so we don't treat live entries
as init*/
,
shared_from_this(), [](LedgerEntryType) { return true; });
shared_from_this(), [](LedgerEntryType) { return true; }, emptySet);
BucketApplicator::Counters counters(app.getClock().now());
while (applicator)
{
Expand Down
5 changes: 5 additions & 0 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// Returns true if bucket is indexed, false otherwise
bool isIndexed() const;

// Returns [lowerBound, upperBound) of file offsets for all offers in the
// bucket, or std::nullopt if no offers exist
std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const;

// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);

Expand Down
61 changes: 59 additions & 2 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ BucketApplicator::BucketApplicator(Application& app,
uint32_t minProtocolVersionSeen,
uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter)
std::function<bool(LedgerEntryType)> filter,
std::unordered_set<LedgerKey>& seenKeys)
: mApp(app)
, mMaxProtocolVersion(maxProtocolVersion)
, mMinProtocolVersionSeen(minProtocolVersionSeen)
, mLevel(level)
, mBucketIter(bucket)
, mEntryTypeFilter(filter)
, mSeenKeys(seenKeys)
{
auto protocolVersion = mBucketIter.getMetadata().ledgerVersion;
if (protocolVersion > mMaxProtocolVersion)
Expand All @@ -37,11 +39,33 @@ BucketApplicator::BucketApplicator(Application& app,
"bucket protocol version {:d} exceeds maxProtocolVersion {:d}"),
protocolVersion, mMaxProtocolVersion));
}

// Only apply offers if BucketListDB is enabled
if (mApp.getConfig().isUsingBucketListDB() && !bucket->isEmpty())
{
auto offsetOp = bucket->getOfferRange();
if (offsetOp)
{
auto [lowOffset, highOffset] = *offsetOp;
mBucketIter.seek(lowOffset);
mUpperBoundOffset = highOffset;
}
else
{
// No offers in Bucket
mOffersRemaining = false;
}
}
}

BucketApplicator::operator bool() const
{
return (bool)mBucketIter;
// There is more work to do (i.e. (bool) *this == true) iff:
// 1. The underlying bucket iterator is not EOF and
// 2. Either BucketListDB is not enabled (so we must apply all entry types)
// or BucketListDB is enabled and we have offers still remaining.
return static_cast<bool>(mBucketIter) &&
(!mApp.getConfig().isUsingBucketListDB() || mOffersRemaining);
}

size_t
Expand Down Expand Up @@ -99,11 +123,43 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)

for (; mBucketIter; ++mBucketIter)
{
// Note: mUpperBoundOffset is not inclusive. However, mBucketIter.pos()
// returns the file offset at the end of the currently loaded entry.
// This means we must read until pos is strictly greater than the upper
// bound so that we don't skip the last offer in the range.
auto isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB();
if (isUsingBucketListDB && mBucketIter.pos() > mUpperBoundOffset)
{
mOffersRemaining = false;
break;
}

BucketEntry const& e = *mBucketIter;
Bucket::checkProtocolLegality(e, mMaxProtocolVersion);

if (shouldApplyEntry(mEntryTypeFilter, e))
{
if (isUsingBucketListDB)
{
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
{
auto [_, wasInserted] =
mSeenKeys.emplace(LedgerEntryKey(e.liveEntry()));

// Skip seen keys
if (!wasInserted)
{
continue;
}
}
else
{
// Only apply INIT and LIVE entries
mSeenKeys.emplace(e.deadEntry());
continue;
}
}

counters.mark(e);

if (e.type() == LIVEENTRY || e.type() == INITENTRY)
Expand Down Expand Up @@ -148,6 +204,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
}
else
{
releaseAssertOrThrow(!isUsingBucketListDB);
if (protocolVersionIsBefore(
mMinProtocolVersionSeen,
Bucket::
Expand Down
12 changes: 11 additions & 1 deletion src/bucket/BucketApplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "bucket/Bucket.h"
#include "bucket/BucketInputIterator.h"
#include "ledger/LedgerHashUtils.h"
#include "util/Timer.h"
#include "util/XDRStream.h"
#include <memory>
Expand All @@ -28,6 +29,9 @@ class BucketApplicator
BucketInputIterator mBucketIter;
size_t mCount{0};
std::function<bool(LedgerEntryType)> mEntryTypeFilter;
std::unordered_set<LedgerKey>& mSeenKeys;
std::streamoff mUpperBoundOffset;
bool mOffersRemaining{true};

public:
class Counters
Expand Down Expand Up @@ -63,10 +67,16 @@ class BucketApplicator
VirtualClock::time_point now);
};

// If newOffersOnly is true, only offers are applied. Additionally, the
// offer is only applied iff:
// 1. They are of type INITENTRY or LIVEENTRY
// 2. The LedgerKey is not in seenKeys
// When this flag is set, each offer key read is added to seenKeys
BucketApplicator(Application& app, uint32_t maxProtocolVersion,
uint32_t minProtocolVersionSeen, uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::function<bool(LedgerEntryType)> filter);
std::function<bool(LedgerEntryType)> filter,
std::unordered_set<LedgerKey>& seenKeys);
operator bool() const;
size_t advance(Counters& counters);

Expand Down
5 changes: 5 additions & 0 deletions src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class BucketIndex : public NonMovableOrCopyable
virtual std::vector<PoolID> const&
getPoolIDsByAsset(Asset const& asset) const = 0;

// Returns lower bound and upper bound for offer entry positions in the
// given bucket, or std::nullopt if no offers exist
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const = 0;

// Returns page size for index. InidividualIndex returns 0 for page size
virtual std::streamoff getPageSize() const = 0;

Expand Down
50 changes: 38 additions & 12 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,16 +450,9 @@ BucketIndexImpl<IndexT>::scan(Iterator start, LedgerKey const& k) const

template <class IndexT>
std::optional<std::pair<std::streamoff, std::streamoff>>
BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
AccountID const& accountID) const
BucketIndexImpl<IndexT>::getOffsetBounds(LedgerKey const& lowerBound,
LedgerKey const& upperBound) const
{
// Get the smallest and largest possible trustline keys for the given
// accountID
auto upperBound = getDummyPoolShareTrustlineKey(
accountID, std::numeric_limits<uint8_t>::max());
auto lowerBound = getDummyPoolShareTrustlineKey(
accountID, std::numeric_limits<uint8_t>::min());

// Get the index iterators for the bounds
auto startIter = std::lower_bound(
mData.keysToOffset.begin(), mData.keysToOffset.end(), lowerBound,
Expand All @@ -469,9 +462,9 @@ BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
return std::nullopt;
}

auto endIter =
std::upper_bound(startIter, mData.keysToOffset.end(), upperBound,
upper_bound_pred<typename IndexT::value_type>);
auto endIter = std::upper_bound(
std::next(startIter), mData.keysToOffset.end(), upperBound,
upper_bound_pred<typename IndexT::value_type>);

// Get file offsets based on lower and upper bound iterators
std::streamoff startOff = startIter->second;
Expand Down Expand Up @@ -501,6 +494,39 @@ BucketIndexImpl<IndexT>::getPoolIDsByAsset(Asset const& asset) const
return iter->second;
}

template <class IndexT>
std::optional<std::pair<std::streamoff, std::streamoff>>
BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
AccountID const& accountID) const
{
// Get the smallest and largest possible trustline keys for the given
// accountID
auto upperBound = getDummyPoolShareTrustlineKey(
accountID, std::numeric_limits<uint8_t>::max());
auto lowerBound = getDummyPoolShareTrustlineKey(
accountID, std::numeric_limits<uint8_t>::min());

return getOffsetBounds(lowerBound, upperBound);
}

template <class IndexT>
std::optional<std::pair<std::streamoff, std::streamoff>>
BucketIndexImpl<IndexT>::getOfferRange() const
{
// Get the smallest and largest possible offer keys
LedgerKey upperBound(OFFER);
upperBound.offer().sellerID.ed25519().fill(
std::numeric_limits<uint8_t>::max());
upperBound.offer().offerID = std::numeric_limits<int64_t>::max();

LedgerKey lowerBound(OFFER);
lowerBound.offer().sellerID.ed25519().fill(
std::numeric_limits<uint8_t>::min());
lowerBound.offer().offerID = std::numeric_limits<int64_t>::min();
sisuresh marked this conversation as resolved.
Show resolved Hide resolved

return getOffsetBounds(lowerBound, upperBound);
}

#ifdef BUILD_TESTS
template <class IndexT>
bool
Expand Down
9 changes: 9 additions & 0 deletions src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
// Saves index to disk, overwriting any preexisting file for this index
void saveToDisk(BucketManager& bm, Hash const& hash) const;

// Returns [lowFileOffset, highFileOffset) that contain the key ranges
// [lowerBound, upperBound]. If no file offsets exist, returns [0, 0]
std::optional<std::pair<std::streamoff, std::streamoff>>
getOffsetBounds(LedgerKey const& lowerBound,
LedgerKey const& upperBound) const;

friend BucketIndex;

public:
Expand All @@ -81,6 +87,9 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
virtual std::vector<PoolID> const&
getPoolIDsByAsset(Asset const& asset) const override;

virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const override;

virtual std::streamoff
getPageSize() const override
{
Expand Down
9 changes: 8 additions & 1 deletion src/bucket/BucketInputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ BucketInputIterator::loadEntry()
}
}

size_t
std::streamoff
BucketInputIterator::pos()
{
return mIn.pos();
Expand Down Expand Up @@ -124,4 +124,11 @@ BucketInputIterator::operator++()
}
return *this;
}

void
BucketInputIterator::seek(std::streamoff offset)
{
mIn.seek(offset);
loadEntry();
}
}
3 changes: 2 additions & 1 deletion src/bucket/BucketInputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class BucketInputIterator

BucketInputIterator& operator++();

size_t pos();
std::streamoff pos();
size_t size() const;
void seek(std::streamoff offset);
};
}
Loading
Loading