Skip to content

Commit

Permalink
perf: improve ycsb performance
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Zhang <[email protected]>
  • Loading branch information
zz-jason committed Sep 4, 2024
1 parent 925a074 commit 89c6ec8
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 65 deletions.
6 changes: 4 additions & 2 deletions include/leanstore/concurrency/ConcurrencyControl.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "leanstore/LeanStore.hpp"
#include "leanstore/Slice.hpp"
#include "leanstore/Units.hpp"
#include "leanstore/concurrency/HistoryStorage.hpp"
#include "leanstore/profiling/counters/CRCounters.hpp"
Expand Down Expand Up @@ -193,11 +194,12 @@ class ConcurrencyControl {
//! @param getCallback: the callback function to be called when the version is found.
//! @return: true if the version is found, false otherwise.
inline bool GetVersion(WORKERID newerWorkerId, TXID newerTxId, COMMANDID newerCommandId,
std::function<void(const uint8_t*, uint64_t versionSize)> getCallback) {
std::function<void(Slice)> versionCallback) {
utils::Timer timer(CRCounters::MyCounters().cc_ms_history_tree_retrieve);
auto isRemoveCommand = newerCommandId & kRemoveCommandMark;
return Other(newerWorkerId)
.mHistoryStorage.GetVersion(newerTxId, newerCommandId, isRemoveCommand, getCallback);
.mHistoryStorage.GetVersion(newerTxId, newerCommandId, isRemoveCommand,
std::move(versionCallback));
}

//! Put a version to the version storage. The callback function is called with the version data
Expand Down
3 changes: 2 additions & 1 deletion include/leanstore/concurrency/HistoryStorage.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "leanstore/Slice.hpp"
#include "leanstore/Units.hpp"

#include <cstdint>
Expand Down Expand Up @@ -88,7 +89,7 @@ class HistoryStorage {
uint64_t payloadLength, std::function<void(uint8_t*)> cb, bool sameThread = true);

bool GetVersion(TXID newerTxId, COMMANDID newerCommandId, const bool isRemoveCommand,
std::function<void(const uint8_t*, uint64_t)> cb);
std::function<void(Slice)> cb);

void PurgeVersions(TXID fromTxId, TXID toTxId, RemoveVersionCallback cb, const uint64_t limit);

Expand Down
19 changes: 5 additions & 14 deletions include/leanstore/concurrency/Logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,11 @@ class Logging {
//! processing.
WalEntryComplex* mActiveWALEntryComplex;

//! Protects mTxToCommit
std::mutex mTxToCommitMutex;
//! The pending-to-commit transactions which have remote dependencies.
std::atomic<Transaction*> mActiveTxToCommit;

//! The queue for each worker thread to store pending-to-commit transactions which have remote
//! dependencies.
std::vector<Transaction> mTxToCommit;

//! Protects mTxToCommit
std::mutex mRfaTxToCommitMutex;

//! The queue for each worker thread to store pending-to-commit transactions which doesn't have
//! any remote dependencies.
std::vector<Transaction> mRfaTxToCommit;
//! The pending-to-commit transactions which doesn't have any remote dependencies.
std::atomic<Transaction*> mActiveRfaTxToCommit;

//! Represents the maximum commit timestamp in the worker. Transactions in the worker are
//! committed if their commit timestamps are smaller than it.
Expand Down Expand Up @@ -137,8 +129,7 @@ class Logging {

void publishWalFlushReq();

//! Calculate the continuous free space left in the wal ring buffer. Return
//! size of the contiguous free space.
//! Continuous free space left in the wal ring buffer.
uint32_t walContiguousFreeSpace();
};

Expand Down
5 changes: 3 additions & 2 deletions src/btree/ChainedTuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ std::tuple<OpCode, uint16_t> ChainedTuple::GetVisibleTuple(Slice payload,
uint16_t versionsRead = 1;
while (true) {
bool found = cr::WorkerContext::My().mCc.GetVersion(
newerWorkerId, newerTxId, newerCommandId,
[&](const uint8_t* versionBuf, uint64_t versionSize) {
newerWorkerId, newerTxId, newerCommandId, [&](Slice versionSlice) {
auto* versionBuf = versionSlice.data();
auto versionSize = versionSlice.size();
auto& version = *reinterpret_cast<const Version*>(versionBuf);
switch (version.mType) {
case VersionType::kUpdate: {
Expand Down
4 changes: 2 additions & 2 deletions src/btree/TransactionKV.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,12 @@ std::tuple<OpCode, uint16_t> TransactionKV::getVisibleTuple(Slice payload, ValCa
switch (tuple->mFormat) {
case TupleFormat::kChained: {
const auto* const chainedTuple = ChainedTuple::From(payload.data());
ret = chainedTuple->GetVisibleTuple(payload, callback);
ret = chainedTuple->GetVisibleTuple(payload, std::move(callback));
JUMPMU_RETURN ret;
}
case TupleFormat::kFat: {
const auto* const fatTuple = FatTuple::From(payload.data());
ret = fatTuple->GetVisibleTuple(callback);
ret = fatTuple->GetVisibleTuple(std::move(callback));
JUMPMU_RETURN ret;
}
default: {
Expand Down
4 changes: 2 additions & 2 deletions src/btree/Tuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ bool Tuple::ToFat(PessimisticExclusiveIterator& xIter) {
}

if (!cr::WorkerContext::My().mCc.GetVersion(
newerWorkerId, newerTxId, newerCommandId, [&](const uint8_t* version, uint64_t) {
newerWorkerId, newerTxId, newerCommandId, [&](Slice version) {
numDeltasToReplace++;
const auto& chainedDelta = *UpdateVersion::From(version);
const auto& chainedDelta = *UpdateVersion::From(version.data());
LS_DCHECK(chainedDelta.mType == VersionType::kUpdate);
LS_DCHECK(chainedDelta.mIsDelta);

Expand Down
51 changes: 16 additions & 35 deletions src/concurrency/GroupCommitter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "leanstore/concurrency/GroupCommitter.hpp"

#include "leanstore/concurrency/CRManager.hpp"
#include "leanstore/concurrency/Transaction.hpp"
#include "leanstore/concurrency/WorkerContext.hpp"
#include "leanstore/profiling/counters/CPUCounters.hpp"
#include "leanstore/telemetry/MetricOnlyTimer.hpp"
Expand Down Expand Up @@ -41,7 +42,7 @@ void GroupCommitter::runImpl() {
}

void GroupCommitter::collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUsrTx,
std::vector<uint64_t>& numRfaTxs,
std::vector<uint64_t>& numRfaTxs [[maybe_unused]],
std::vector<WalFlushReq>& walFlushReqCopies) {
leanstore::telemetry::MetricOnlyTimer timer;
SCOPED_DEFER({
Expand All @@ -54,11 +55,6 @@ void GroupCommitter::collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUs
for (auto workerId = 0u; workerId < mWorkerCtxs.size(); workerId++) {
auto& logging = mWorkerCtxs[workerId]->mLogging;

// collect logging info
std::unique_lock<std::mutex> guard(logging.mRfaTxToCommitMutex);
numRfaTxs[workerId] = logging.mRfaTxToCommit.size();
guard.unlock();

auto lastReqVersion = walFlushReqCopies[workerId].mVersion;
auto version = logging.mWalFlushReq.Get(walFlushReqCopies[workerId]);
walFlushReqCopies[workerId].mVersion = version;
Expand Down Expand Up @@ -120,7 +116,7 @@ void GroupCommitter::flushWalRecords() {
}

void GroupCommitter::determineCommitableTx(TXID minFlushedSysTx, TXID minFlushedUsrTx,
const std::vector<uint64_t>& numRfaTxs,
const std::vector<uint64_t>& numRfaTxs [[maybe_unused]],
const std::vector<WalFlushReq>& walFlushReqCopies) {
leanstore::telemetry::MetricOnlyTimer timer;
SCOPED_DEFER({
Expand All @@ -137,41 +133,26 @@ void GroupCommitter::determineCommitableTx(TXID minFlushedSysTx, TXID minFlushed
// commit transactions with remote dependency
TXID maxCommitTs = 0;
{
std::unique_lock<std::mutex> g(logging.mTxToCommitMutex);
uint64_t i = 0;
for (; i < logging.mTxToCommit.size(); ++i) {
auto& tx = logging.mTxToCommit[i];
if (!tx.CanCommit(minFlushedSysTx, minFlushedUsrTx)) {
break;
}
maxCommitTs = std::max<TXID>(maxCommitTs, tx.mCommitTs);
tx.mState = TxState::kCommitted;
if (auto* tx = logging.mActiveTxToCommit.load(std::memory_order_relaxed);
tx != nullptr && tx->CanCommit(minFlushedSysTx, minFlushedUsrTx)) {
maxCommitTs = tx->mCommitTs;
tx->mState = TxState::kCommitted;
logging.mActiveTxToCommit.store(nullptr);
LS_DLOG("Transaction with remote dependency committed, workerId={}, startTs={}, "
"commitTs={}, minFlushedSysTx={}, minFlushedUsrTx={}",
workerId, tx.mStartTs, tx.mCommitTs, minFlushedSysTx, minFlushedUsrTx);
}
if (i > 0) {
logging.mTxToCommit.erase(logging.mTxToCommit.begin(), logging.mTxToCommit.begin() + i);
workerId, tx->mStartTs, tx->mCommitTs, minFlushedSysTx, minFlushedUsrTx);
}
}

// commit transactions without remote dependency
TXID maxCommitTsRfa = 0;
{
std::unique_lock<std::mutex> g(logging.mRfaTxToCommitMutex);
uint64_t i = 0;
for (; i < numRfaTxs[workerId]; ++i) {
auto& tx = logging.mRfaTxToCommit[i];
maxCommitTsRfa = std::max<TXID>(maxCommitTsRfa, tx.mCommitTs);
tx.mState = TxState::kCommitted;
LS_DLOG("Transaction without remote dependency committed, workerId={}, startTs={}, "
"commitTs={}",
workerId, tx.mStartTs, tx.mCommitTs);
}
if (i > 0) {
logging.mRfaTxToCommit.erase(logging.mRfaTxToCommit.begin(),
logging.mRfaTxToCommit.begin() + i);
}
if (auto* tx = logging.mActiveRfaTxToCommit.load(std::memory_order_relaxed); tx != nullptr) {
maxCommitTsRfa = tx->mCommitTs;
tx->mState = TxState::kCommitted;
logging.mActiveRfaTxToCommit.store(nullptr);
LS_DLOG("Transaction without remote dependency committed, workerId={}, "
"startTs={}, commitTs={}",
workerId, tx->mStartTs, tx->mCommitTs);
}

// Has committed transaction
Expand Down
5 changes: 2 additions & 3 deletions src/concurrency/HistoryStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ void HistoryStorage::PutVersion(TXID txId, COMMANDID commandId, TREEID treeId, b
}

bool HistoryStorage::GetVersion(TXID newerTxId, COMMANDID newerCommandId,
const bool isRemoveCommand,
std::function<void(const uint8_t*, uint64_t)> cb) {
const bool isRemoveCommand, std::function<void(Slice)> cb) {
volatile BasicKV* btree = (isRemoveCommand) ? mRemoveIndex : mUpdateIndex;
const uint64_t keySize = sizeof(newerTxId) + sizeof(newerCommandId);
uint8_t keyBuffer[keySize];
Expand All @@ -114,7 +113,7 @@ bool HistoryStorage::GetVersion(TXID newerTxId, COMMANDID newerCommandId,
BasicKV* kv = const_cast<BasicKV*>(btree);
auto ret = kv->Lookup(key, [&](const Slice& payload) {
const auto& versionContainer = *VersionMeta::From(payload.data());
cb(versionContainer.mPayload, payload.length() - sizeof(VersionMeta));
cb({versionContainer.mPayload, payload.length() - sizeof(VersionMeta)});
});

if (ret == OpCode::kNotFound) {
Expand Down
6 changes: 2 additions & 4 deletions src/concurrency/WorkerContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ void WorkerContext::CommitTx() {

// for group commit
if (mActiveTx.mHasRemoteDependency) {
std::unique_lock<std::mutex> g(mLogging.mTxToCommitMutex);
mLogging.mTxToCommit.push_back(mActiveTx);
mLogging.mActiveTxToCommit.store(&mActiveTx);
} else {
std::unique_lock<std::mutex> g(mLogging.mRfaTxToCommitMutex);
mLogging.mRfaTxToCommit.push_back(mActiveTx);
mLogging.mActiveRfaTxToCommit.store(&mActiveTx);
}

// Cleanup versions in history tree
Expand Down

0 comments on commit 89c6ec8

Please sign in to comment.