From 18f55ede77d6cc823b32899133e81ad7372d00f3 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Mon, 9 Sep 2024 23:19:27 +0800 Subject: [PATCH] perf: support scalable logging Signed-off-by: Jian Zhang --- benchmarks/ycsb/YcsbWiredTiger.hpp | 2 +- include/leanstore/LeanStore.hpp | 12 +- .../core/PessimisticExclusiveIterator.hpp | 19 +- .../btree/core/PessimisticIterator.hpp | 4 +- .../buffer-manager/GuardedBufferFrame.hpp | 7 +- include/leanstore/concurrency/CRManager.hpp | 11 +- .../leanstore/concurrency/GroupCommitter.hpp | 94 --------- include/leanstore/concurrency/Logging.hpp | 123 ++---------- include/leanstore/concurrency/LoggingImpl.hpp | 5 +- include/leanstore/concurrency/Transaction.hpp | 15 +- include/leanstore/concurrency/WalBuffer.hpp | 187 ++++++++++++++++++ .../concurrency/WalPayloadHandler.hpp | 15 +- .../leanstore/concurrency/WorkerContext.hpp | 41 +++- src/LeanStore.cpp | 46 ++--- src/btree/BasicKV.cpp | 4 +- src/btree/TransactionKV.cpp | 3 - src/btree/core/BTreeGeneric.cpp | 8 +- src/buffer-manager/BufferManager.cpp | 8 +- src/concurrency/CRManager.cpp | 15 +- src/concurrency/GroupCommitter.cpp | 185 ----------------- src/concurrency/HistoryStorage.cpp | 10 +- src/concurrency/Logging.cpp | 102 +--------- src/concurrency/Recovery.cpp | 43 ++-- src/concurrency/WorkerContext.cpp | 86 +++----- 24 files changed, 367 insertions(+), 678 deletions(-) delete mode 100644 include/leanstore/concurrency/GroupCommitter.hpp create mode 100644 include/leanstore/concurrency/WalBuffer.hpp delete mode 100644 src/concurrency/GroupCommitter.cpp diff --git a/benchmarks/ycsb/YcsbWiredTiger.hpp b/benchmarks/ycsb/YcsbWiredTiger.hpp index 52535d1f..a2358f0d 100644 --- a/benchmarks/ycsb/YcsbWiredTiger.hpp +++ b/benchmarks/ycsb/YcsbWiredTiger.hpp @@ -198,7 +198,7 @@ class YcsbWiredTiger : public YcsbExecutor { std::string configString( "create, direct_io=[data, log, checkpoint], " - "log=(enabled=true,archive=true), statistics_log=(wait=1), " + "log=(enabled),transaction_sync=(enabled=true,method=none), statistics_log=(wait=1), " "statistics=(all, clear), session_max=2000, eviction=(threads_max=4), cache_size=" + std::to_string(FLAGS_ycsb_mem_kb / 1024) + "M"); int ret = wiredtiger_open(dataDir.c_str(), nullptr, configString.c_str(), &mConn); diff --git a/include/leanstore/LeanStore.hpp b/include/leanstore/LeanStore.hpp index 57d942de..cd0f6fcd 100644 --- a/include/leanstore/LeanStore.hpp +++ b/include/leanstore/LeanStore.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -52,9 +53,6 @@ class LeanStore { //! The file descriptor for pages int32_t mPageFd; - //! The file descriptor for write-ahead log - int32_t mWalFd; - //! The tree registry std::unique_ptr mTreeRegistry; @@ -144,8 +142,8 @@ class LeanStore { return std::string(mStoreOption->mStoreDir) + "/db.pages"; } - std::string GetWalFilePath() const { - return std::string(mStoreOption->mStoreDir) + "/db.wal"; + std::string GetDbWalPath(uint64_t workerId) const { + return std::format("{}/worker_{}.wal", getDbWalDir(), workerId); } private: @@ -163,6 +161,10 @@ class LeanStore { void deserializeFlags(); void initPageAndWalFd(); + + std::string getDbWalDir() const { + return std::format("{}/wal", mStoreOption->mStoreDir); + } }; } // namespace leanstore diff --git a/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp b/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp index e451ddf7..a5f94e34 100644 --- a/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp +++ b/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp @@ -2,7 +2,9 @@ #include "leanstore/KVInterface.hpp" #include "leanstore/btree/core/PessimisticIterator.hpp" +#include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/utils/CounterUtil.hpp" +#include "leanstore/utils/Defer.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/RandomGenerator.hpp" #include "leanstore/utils/UserThread.hpp" @@ -63,7 +65,9 @@ class PessimisticExclusiveIterator : public PessimisticIterator { } void SplitForKey(Slice key) { - auto sysTxId = mBTree.mStore->AllocSysTxTs(); + TXID sysTxId = cr::WorkerContext::My().StartSysTx(); + SCOPED_DEFER(cr::WorkerContext::My().CommitSysTx()); + while (true) { JUMPMU_TRY() { if (!Valid() || !KeyInCurrentNode(key)) { @@ -163,18 +167,18 @@ class PessimisticExclusiveIterator : public PessimisticIterator { mSlotId = -1; JUMPMU_TRY() { - TXID sysTxId = mBTree.mStore->AllocSysTxTs(); + TXID sysTxId = cr::WorkerContext::My().StartSysTx(); mBTree.TrySplitMayJump(sysTxId, *mGuardedLeaf.mBf, splitSlot); + cr::WorkerContext::My().CommitSysTx(); COUNTER_INC(&leanstore::cr::tlsPerfCounters.mContentionSplitSucceed); - LS_DLOG("[Contention Split] succeed, pageId={}, contention pct={}, split " - "slot={}", + LS_DLOG("[Contention Split] succeed, pageId={}, contentionPct={}, splitSlot={}", mGuardedLeaf.mBf->mHeader.mPageId, contentionPct, splitSlot); } JUMPMU_CATCH() { COUNTER_INC(&leanstore::cr::tlsPerfCounters.mContentionSplitFailed); - Log::Info("[Contention Split] contention split failed, pageId={}, contention " - "pct={}, split slot={}", + Log::Info("[Contention Split] contention split failed, pageId={}, " + "contentionPct={}, splitSlot={}", mGuardedLeaf.mBf->mHeader.mPageId, contentionPct, splitSlot); } } @@ -196,8 +200,9 @@ class PessimisticExclusiveIterator : public PessimisticIterator { mGuardedLeaf.unlock(); mSlotId = -1; JUMPMU_TRY() { - TXID sysTxId = mBTree.mStore->AllocSysTxTs(); + TXID sysTxId = cr::WorkerContext::My().StartSysTx(); mBTree.TryMergeMayJump(sysTxId, *mGuardedLeaf.mBf); + cr::WorkerContext::My().CommitSysTx(); } JUMPMU_CATCH() { LS_DLOG("TryMergeIfNeeded failed, pageId={}", mGuardedLeaf.mBf->mHeader.mPageId); diff --git a/include/leanstore/btree/core/PessimisticIterator.hpp b/include/leanstore/btree/core/PessimisticIterator.hpp index ef1d0dee..018c1c2e 100644 --- a/include/leanstore/btree/core/PessimisticIterator.hpp +++ b/include/leanstore/btree/core/PessimisticIterator.hpp @@ -6,6 +6,7 @@ #include "leanstore/Slice.hpp" #include "leanstore/btree/core/BTreeNode.hpp" #include "leanstore/buffer-manager/GuardedBufferFrame.hpp" +#include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/sync/HybridLatch.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/UserThread.hpp" @@ -397,8 +398,9 @@ inline void PessimisticIterator::Next() { if (mGuardedLeaf->mNumSlots == 0) { SetCleanUpCallback([&, toMerge = mGuardedLeaf.mBf]() { JUMPMU_TRY() { - TXID sysTxId = mBTree.mStore->AllocSysTxTs(); + TXID sysTxId = cr::WorkerContext::My().StartSysTx(); mBTree.TryMergeMayJump(sysTxId, *toMerge, true); + cr::WorkerContext::My().CommitSysTx(); } JUMPMU_CATCH() { } diff --git a/include/leanstore/buffer-manager/GuardedBufferFrame.hpp b/include/leanstore/buffer-manager/GuardedBufferFrame.hpp index 2f9a48ad..f634ddd4 100644 --- a/include/leanstore/buffer-manager/GuardedBufferFrame.hpp +++ b/include/leanstore/buffer-manager/GuardedBufferFrame.hpp @@ -158,9 +158,6 @@ class GuardedBufferFrame { // update system transaction id mBf->mPage.mSysTxId = sysTxId; - - // update the maximum system transaction id written by the worker - cr::WorkerContext::My().mLogging.UpdateSysTxWrittern(sysTxId); } //! Check remote dependency @@ -172,8 +169,8 @@ class GuardedBufferFrame { } if (mBf->mHeader.mLastWriterWorker != cr::WorkerContext::My().mWorkerId && - mBf->mPage.mSysTxId > cr::ActiveTx().mMaxObservedSysTxId) { - cr::ActiveTx().mMaxObservedSysTxId = mBf->mPage.mSysTxId; + mBf->mPage.mSysTxId > cr::ActiveTx().mDependentSysTx) { + cr::ActiveTx().mDependentSysTx = mBf->mPage.mSysTxId; cr::ActiveTx().mHasRemoteDependency = true; } } diff --git a/include/leanstore/concurrency/CRManager.hpp b/include/leanstore/concurrency/CRManager.hpp index 9a91dd14..333d0f97 100644 --- a/include/leanstore/concurrency/CRManager.hpp +++ b/include/leanstore/concurrency/CRManager.hpp @@ -14,7 +14,6 @@ class LeanStore; namespace cr { struct WaterMarkInfo; -class GroupCommitter; //! Manages a fixed number of worker threads and group committer threads. class CRManager { @@ -30,19 +29,13 @@ class CRManager { WaterMarkInfo mGlobalWmkInfo; - //! The group committer thread, created and started if WAL is enabled when the - //! CRManager instance is created. - /// - //! NOTE: It should be created after all the worker threads are created and - //! started. - std::unique_ptr mGroupCommitter; - public: + //! Construct a CRManager. CRManager(leanstore::LeanStore* store); + //! Destruct a CRManager. ~CRManager(); -public: // State Serialization StringMap Serialize(); diff --git a/include/leanstore/concurrency/GroupCommitter.hpp b/include/leanstore/concurrency/GroupCommitter.hpp deleted file mode 100644 index 106cfe0b..00000000 --- a/include/leanstore/concurrency/GroupCommitter.hpp +++ /dev/null @@ -1,94 +0,0 @@ -#pragma once - -#include "leanstore/LeanStore.hpp" -#include "leanstore/Units.hpp" -#include "leanstore/utils/AsyncIo.hpp" -#include "leanstore/utils/UserThread.hpp" - -#include -#include - -#include -#include -#include -#include - -namespace leanstore::cr { - -class WorkerContext; -class WalFlushReq; - -//! The group committer thread is responsible for committing transactions in batches. It collects -//! wal records from all the worker threads, writes them to the wal file with libaio, and determines -//! the commitable transactions based on the min flushed GSN and min flushed transaction ID. -class GroupCommitter : public leanstore::utils::UserThread { -public: - leanstore::LeanStore* mStore; - - //! File descriptor of the underlying WAL file. - const int32_t mWalFd; - - //! Start file offset of the next WalEntry. - uint64_t mWalSize; - - //! The minimum flushed system transaction ID among all worker threads. User transactions whose - //! max observed system transaction ID not larger than it can be committed safely. - std::atomic mGlobalMinFlushedSysTx; - - //! All the workers. - std::vector& mWorkerCtxs; - - //! The libaio wrapper. - utils::AsyncIo mAIo; - -public: - GroupCommitter(leanstore::LeanStore* store, int32_t walFd, std::vector& workers, - int cpu) - : UserThread(store, "GroupCommitter", cpu), - mStore(store), - mWalFd(walFd), - mWalSize(0), - mGlobalMinFlushedSysTx(0), - mWorkerCtxs(workers), - mAIo(workers.size() * 2 + 2) { - } - - virtual ~GroupCommitter() override = default; - -protected: - virtual void runImpl() override; - -private: - //! Phase 1: collect wal records from all the worker threads. Collected wal records are written to - //! libaio IOCBs. - //! - //! @param[out] minFlushedSysTx the min flushed system transaction ID - //! @param[out] minFlushedUsrTx the min flushed user transaction ID - //! @param[out] numRfaTxs number of transactions without dependency - //! @param[out] walFlushReqCopies snapshot of the flush requests - void collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUsrTx, - std::vector& numRfaTxs, - std::vector& walFlushReqCopies); - - //! Phase 2: write all the collected wal records to the wal file with libaio. - void flushWalRecords(); - - //! Phase 3: determine the commitable transactions based on minFlushedGSN and minFlushedTxId. - //! - //! @param[in] minFlushedSysTx the min flushed system transaction ID - //! @param[in] minFlushedUsrTx the min flushed user transaction ID - //! @param[in] numRfaTxs number of transactions without dependency - //! @param[in] walFlushReqCopies snapshot of the flush requests - void determineCommitableTx(TXID minFlushedSysTx, TXID minFlushedUsrTx, - const std::vector& numRfaTxs, - const std::vector& walFlushReqCopies); - - //! Append a wal entry to libaio IOCBs. - //! - //! @param[in] buf the wal entry buffer - //! @param[in] lower the begin offset of the wal entry in the buffer - //! @param[in] upper the end offset of the wal entry in the buffer - void append(uint8_t* buf, uint64_t lower, uint64_t upper); -}; - -} // namespace leanstore::cr diff --git a/include/leanstore/concurrency/Logging.hpp b/include/leanstore/concurrency/Logging.hpp index d000da62..7478e6ce 100644 --- a/include/leanstore/concurrency/Logging.hpp +++ b/include/leanstore/concurrency/Logging.hpp @@ -1,149 +1,56 @@ #pragma once -#include "leanstore-c/PerfCounters.h" #include "leanstore/Units.hpp" -#include "leanstore/concurrency/Transaction.hpp" -#include "leanstore/sync/OptimisticGuarded.hpp" -#include "leanstore/utils/CounterUtil.hpp" +#include "leanstore/concurrency/WalBuffer.hpp" -#include #include -#include -#include -#include +#include + +#include namespace leanstore::cr { //! forward declarations -class WalEntry; class WalEntryComplex; -//! Used to sync wal flush request between group committer and worker. -struct WalFlushReq { - //! Used for optimistic locking. - uint64_t mVersion = 0; - - //! The offset in the wal ring buffer. - uint64_t mWalBuffered = 0; - - //! The maximum system transasction ID written by the worker. - //! NOTE: can only be updated when all the WAL entries belonging to the system transaction are - //! written to the wal ring buffer. - TXID mSysTxWrittern = 0; - - //! ID of the current transaction. - //! NOTE: can only be updated when all the WAL entries belonging to the user transaction are - //! written to the wal ring buffer. - TXID mCurrTxId = 0; - - WalFlushReq(uint64_t walBuffered = 0, uint64_t sysTxWrittern = 0, TXID currTxId = 0) - : mVersion(0), - mWalBuffered(walBuffered), - mSysTxWrittern(sysTxWrittern), - mCurrTxId(currTxId) { - } -}; - template class WalPayloadHandler; //! Helps to transaction concurrenct control and write-ahead logging. class Logging { public: - LID mPrevLSN; + WalBuffer mWalBuffer; //! The active complex WalEntry for the current transaction, usually used for insert, update, //! delete, or btree related operations. - //! - //! NOTE: Either mActiveWALEntrySimple or mActiveWALEntryComplex is effective during transaction - //! processing. + //! NOTE: only effective during transaction processing. WalEntryComplex* mActiveWALEntryComplex; - //! Protects mTxToCommit - std::mutex mTxToCommitMutex; - - //! The queue for each worker thread to store pending-to-commit transactions which have remote - //! dependencies. - std::vector mTxToCommit; - - //! Protects mTxToCommit - std::mutex mRfaTxToCommitMutex; - - //! The queue for each worker thread to store pending-to-commit transactions which doesn't have - //! any remote dependencies. - std::vector mRfaTxToCommit; - - //! Represents the maximum commit timestamp in the worker. Transactions in the worker are - //! committed if their commit timestamps are smaller than it. - //! - //! Updated by group committer - std::atomic mSignaledCommitTs = 0; - - storage::OptimisticGuarded mWalFlushReq; - - //! The ring buffer of the current worker thread. All the wal entries of the current worker are - //! writtern to this ring buffer firstly, then flushed to disk by the group commit thread. - alignas(512) uint8_t* mWalBuffer; - - //! The size of the wal ring buffer. - uint64_t mWalBufferSize; - //! Used to track the write order of wal entries. LID mLsnClock = 0; - //! The maximum writtern system transaction ID in the worker. - TXID mSysTxWrittern = 0; - - //! The written offset of the wal ring buffer. - uint64_t mWalBuffered = 0; + LID mPrevLSN; - //! Represents the flushed offset in the wal ring buffer. The wal ring buffer is firstly written - //! by the worker thread then flushed to disk file by the group commit thread. - std::atomic mWalFlushed = 0; + //! The maximum committed system transasction ID on the worker. A system transaction is committed + //! only when all its WAL records are written and flushed to disk. + std::atomic mActiveSysTx = 0; - //! The first WAL record of the current active transaction. - uint64_t mTxWalBegin; + inline static std::atomic sGlobalMinCommittedSysTx = 0; public: - void UpdateSignaledCommitTs(const LID signaledCommitTs) { - mSignaledCommitTs.store(signaledCommitTs, std::memory_order_release); + Logging(uint64_t walBufferCapacity, std::string walFilePath, uint64_t ioBatchSize, + bool createFromScratch) + : mWalBuffer(walBufferCapacity, std::move(walFilePath), ioBatchSize, createFromScratch) { } - void WaitToCommit(const TXID commitTs) { - COUNTER_INC(&tlsPerfCounters.mTxCommitWait); - while (!(commitTs <= mSignaledCommitTs.load())) { - } - } - - void ReserveContiguousBuffer(uint32_t requestedSize); - - //! Iterate over current TX entries - void IterateCurrentTxWALs(std::function callback); + ~Logging() = default; void WriteWalTxAbort(); void WriteWalTxFinish(); - void WriteWalCarriageReturn(); template WalPayloadHandler ReserveWALEntryComplex(uint64_t payloadSize, PID pageId, LID psn, TREEID treeId, Args&&... args); - - //! Submits wal record to group committer when it is ready to flush to disk. - //! @param totalSize size of the wal record to be flush. - void SubmitWALEntryComplex(uint64_t totalSize); - - void UpdateSysTxWrittern(TXID sysTxId) { - mSysTxWrittern = std::max(mSysTxWrittern, sysTxId); - } - -private: - void publishWalBufferedOffset(); - - void publishWalFlushReq(); - - //! Calculate the continuous free space left in the wal ring buffer. Return - //! size of the contiguous free space. - uint32_t walContiguousFreeSpace(); }; } // namespace leanstore::cr \ No newline at end of file diff --git a/include/leanstore/concurrency/LoggingImpl.hpp b/include/leanstore/concurrency/LoggingImpl.hpp index d9591b73..3ebb55d9 100644 --- a/include/leanstore/concurrency/LoggingImpl.hpp +++ b/include/leanstore/concurrency/LoggingImpl.hpp @@ -21,10 +21,9 @@ WalPayloadHandler Logging::ReserveWALEntryComplex(uint64_t payloadSize, PID p // update prev lsn in the end SCOPED_DEFER(mPrevLSN = mActiveWALEntryComplex->mLsn); - auto entryLSN = mLsnClock++; - auto* entryPtr = mWalBuffer + mWalBuffered; auto entrySize = sizeof(WalEntryComplex) + payloadSize; - ReserveContiguousBuffer(entrySize); + auto* entryPtr = mWalBuffer.Get(entrySize); + auto entryLSN = mLsnClock++; mActiveWALEntryComplex = new (entryPtr) WalEntryComplex(entryLSN, prevLsn, entrySize, WorkerContext::My().mWorkerId, diff --git a/include/leanstore/concurrency/Transaction.hpp b/include/leanstore/concurrency/Transaction.hpp index 63c799ac..f4a7c4e7 100644 --- a/include/leanstore/concurrency/Transaction.hpp +++ b/include/leanstore/concurrency/Transaction.hpp @@ -5,8 +5,7 @@ #include "leanstore/Units.hpp" #include "leanstore/utils/UserThread.hpp" -namespace leanstore { -namespace cr { +namespace leanstore::cr { enum class TxState { kIdle, kStarted, kCommitted, kAborted }; @@ -46,7 +45,7 @@ class Transaction { //! Maximum observed system transaction id during transaction processing. Used to track //! transaction dependencies. - TXID mMaxObservedSysTxId = 0; + TXID mDependentSysTx = 0; //! Whether the transaction has any remote dependencies. Currently, we only support SI isolation //! level, a user transaction can only depend on a system transaction executed in a remote worker @@ -83,7 +82,7 @@ class Transaction { mState = TxState::kStarted; mStartTs = 0; mCommitTs = 0; - mMaxObservedSysTxId = 0; + mDependentSysTx = 0; mHasRemoteDependency = false; mTxMode = mode; mTxIsolationLevel = level; @@ -91,12 +90,6 @@ class Transaction { mIsDurable = utils::tlsStore->mStoreOption->mEnableWal; mWalExceedBuffer = false; } - - //! Check whether a user transaction with remote dependencies can be committed. - bool CanCommit(TXID minFlushedSysTx, TXID minFlushedUsrTx) { - return mMaxObservedSysTxId <= minFlushedSysTx && mStartTs <= minFlushedUsrTx; - } }; -} // namespace cr -} // namespace leanstore +} // namespace leanstore::cr diff --git a/include/leanstore/concurrency/WalBuffer.hpp b/include/leanstore/concurrency/WalBuffer.hpp new file mode 100644 index 00000000..253b4244 --- /dev/null +++ b/include/leanstore/concurrency/WalBuffer.hpp @@ -0,0 +1,187 @@ +#pragma once + +#include "leanstore/LeanStore.hpp" +#include "leanstore/concurrency/WalEntry.hpp" +#include "leanstore/utils/AsyncIo.hpp" +#include "leanstore/utils/Misc.hpp" +#include "leanstore/utils/UserThread.hpp" + +#include +#include +#include + +#include + +namespace leanstore::cr { + +class WalBuffer { +private: + //! The wal buffer. + utils::AlignedBuffer<512> mBuffer; + + //! Capacity of the wal buffer. + const uint64_t mCapacity; + + //! Position of buffered bytes; + uint64_t mPosBuffered; + + //! Position of the persisted bytes; + uint64_t mPosPersisted; + + const std::string mWalFilePath; + + int32_t mWalFd; + + uint64_t mWalFileSize; + + utils::AsyncIo mAio; + +public: + WalBuffer(uint64_t capacity, std::string walFilePath, uint64_t ioBatchSize, + bool createFromScratch) + : mBuffer(capacity), + mCapacity(capacity), + mPosBuffered(0), + mPosPersisted(0), + mWalFilePath(std::move(walFilePath)), + mWalFileSize(0), + mAio(ioBatchSize) { + // clear buffer + std::memset(mBuffer.Get(), 0, mCapacity); + + // open wal file + int flags = createFromScratch ? O_TRUNC | O_CREAT | O_RDWR | O_DIRECT : O_RDWR | O_DIRECT; + mWalFd = open(mWalFilePath.c_str(), flags, 0666); + if (mWalFd == -1) { + Log::Fatal("Could not open file at: {}, error: {}", mWalFilePath, strerror(errno)); + } + Log::Info("Init walFd succeed, walFd={}, walFile={}", mWalFd, mWalFilePath); + } + + ~WalBuffer() { + if (close(mWalFd) == -1) { + Log::Error("Close WAL file failed: {}, walFile={}", strerror(errno), mWalFilePath); + } else { + Log::Info("Close WAL file succeed, walFile={}", mWalFilePath); + } + } + + //! Get a contiguous buffer of the given size, return the buffer address. + uint8_t* Get(uint64_t size) { + ensureContiguousBuffer(size); + return mBuffer.Get() + mPosBuffered; + } + + //! Advance the buffer pointer by the given size. + void Advance(uint64_t size) { + mPosBuffered += size; + } + + //! Persist all the buffered bytes to the wal file. Called when the transaction is committed, + //! aborted, or no more space in the buffer. + void Persist(); + + //! Iterate all the wal records in the descending order. It firstly iterates the buffered records + //! and then the persisted records. + void VisitWalRecordsDesc(std::function callback [[maybe_unused]]) { + // TODO: implement + } + +private: + void ensureContiguousBuffer(uint64_t size); + + void carriageReturn(); + + void persistRange(uint64_t lowerBufferPos, uint64_t upperBufferPos); + + void writeToWalFile(); + +}; // WalBuffer + +inline void WalBuffer::Persist() { + if (mPosPersisted == mPosBuffered) { + return; + } + + if (mPosPersisted < mPosBuffered) { + persistRange(mPosPersisted, mPosBuffered); + } else { + persistRange(mPosPersisted, mCapacity); + persistRange(0, mPosBuffered); + } + + if (!mAio.IsEmpty()) { + writeToWalFile(); + } + + mPosPersisted = mPosBuffered; +} + +inline void WalBuffer::ensureContiguousBuffer(uint64_t size) { + while (true) { + if (mPosPersisted <= mPosBuffered) { + if (mCapacity - mPosBuffered >= size) { + return; + } + + // not enough space + carriageReturn(); + if (mPosPersisted == 0) { + Persist(); + } + mPosBuffered = 0; + continue; + } + + if (mPosPersisted - mPosBuffered >= size) { + return; + } + + // not enough space + Persist(); + continue; + } +} + +inline void WalBuffer::carriageReturn() { + auto entrySize = mCapacity - mPosBuffered; + auto* entryPtr = mBuffer.Get() + mPosBuffered; + new (entryPtr) WalCarriageReturn(entrySize); + mPosBuffered = mCapacity; +} + +inline void WalBuffer::persistRange(uint64_t lowerBufferPos, uint64_t upperBufferPos) { + static constexpr uint64_t kAligment = 4096; + auto fileOffsetAligned = utils::AlignDown(mWalFileSize, kAligment); + auto bufferOffsetAligned = utils::AlignDown(lowerBufferPos, kAligment); + auto upperAligned = utils::AlignUp(upperBufferPos, kAligment); + while (bufferOffsetAligned < upperAligned) { + if (mAio.IsFull()) { + writeToWalFile(); + } + mAio.PrepareWrite(mWalFd, mBuffer.Get() + bufferOffsetAligned, kAligment, fileOffsetAligned); + bufferOffsetAligned += kAligment; + fileOffsetAligned += kAligment; + } + mWalFileSize += upperBufferPos - lowerBufferPos; +} + +inline void WalBuffer::writeToWalFile() { + if (auto res = mAio.SubmitAll(); !res) { + Log::Error("Failed to submit all IO, error={}", res.error().ToString()); + } + + timespec timeout = {1, 0}; // 1s + if (auto res = mAio.WaitAll(&timeout); !res) { + Log::Error("Failed to wait all IO, error={}", res.error().ToString()); + } + + if (utils::tlsStore->mStoreOption->mEnableWalFsync) { + auto failed = fdatasync(mWalFd); + if (failed) { + Log::Error("fdatasync failed, errno={}, error={}", errno, strerror(errno)); + } + } +} + +} // namespace leanstore::cr \ No newline at end of file diff --git a/include/leanstore/concurrency/WalPayloadHandler.hpp b/include/leanstore/concurrency/WalPayloadHandler.hpp index 33a8e097..8e327b6a 100644 --- a/include/leanstore/concurrency/WalPayloadHandler.hpp +++ b/include/leanstore/concurrency/WalPayloadHandler.hpp @@ -1,6 +1,5 @@ #pragma once -#include "leanstore/concurrency/GroupCommitter.hpp" #include "leanstore/concurrency/WorkerContext.hpp" namespace leanstore::cr { @@ -23,21 +22,17 @@ class WalPayloadHandler { WalPayloadHandler(T* walPayload, uint64_t size) : mWalPayload(walPayload), mTotalSize(size) { } -public: - inline T* operator->() { + T* operator->() { return mWalPayload; } - inline T& operator*() { + T& operator*() { return *mWalPayload; } - void SubmitWal(); + void SubmitWal() { + cr::WorkerContext::My().mLogging.mWalBuffer.Advance(mTotalSize); + } }; -template -inline void WalPayloadHandler::SubmitWal() { - cr::WorkerContext::My().mLogging.SubmitWALEntryComplex(mTotalSize); -} - } // namespace leanstore::cr \ No newline at end of file diff --git a/include/leanstore/concurrency/WorkerContext.hpp b/include/leanstore/concurrency/WorkerContext.hpp index 8dc00d6a..4de601cb 100644 --- a/include/leanstore/concurrency/WorkerContext.hpp +++ b/include/leanstore/concurrency/WorkerContext.hpp @@ -53,7 +53,7 @@ class WorkerContext { leanstore::LeanStore* store); //! Destruct a WorkerContext. - ~WorkerContext(); + ~WorkerContext() = default; //! Whether a user transaction is started. bool IsTxStarted() { @@ -70,6 +70,22 @@ class WorkerContext { //! Aborts a user transaction. void AbortTx(); + //! Starts a system transaction. + TXID StartSysTx() { + auto sysTx = mStore->AllocSysTxTs(); + mLogging.mActiveSysTx.store(sysTx); + return sysTx; + } + + //! Commits a system transaction. + void CommitSysTx() { + // 1. Write all the buffered write-ahead logs + mLogging.mWalBuffer.Persist(); + + // 2. Update the committed system transaction ID in the end + mLogging.mActiveSysTx.store(0); + } + //! Get the PerfCounters of the current worker. PerfCounters* GetPerfCounters(); @@ -91,9 +107,30 @@ class WorkerContext { static bool InWorker() { return WorkerContext::sTlsWorkerCtxRaw != nullptr; } + +private: + void waitDependencyToCommit() { + while (mActiveTx.mDependentSysTx > Logging::sGlobalMinCommittedSysTx.load()) { + TXID curSysTso = mStore->GetSysTxTs(); + + // collect min committed system transaction ID + TXID minActiveSysTx = std::numeric_limits::max(); + for (auto& workerCtx : mAllWorkers) { + auto activeSysTx = workerCtx->mLogging.mActiveSysTx.load(); + if (activeSysTx < minActiveSysTx && activeSysTx != 0) { + minActiveSysTx = activeSysTx; + } + } + + if (minActiveSysTx != std::numeric_limits::max()) { + Logging::sGlobalMinCommittedSysTx.store(minActiveSysTx - 1); + } else { + Logging::sGlobalMinCommittedSysTx.store(curSysTso); + } + } + } }; -// Shortcuts inline Transaction& ActiveTx() { return cr::WorkerContext::My().mActiveTx; } diff --git a/src/LeanStore.cpp b/src/LeanStore.cpp index 89488059..71013b08 100644 --- a/src/LeanStore.cpp +++ b/src/LeanStore.cpp @@ -52,7 +52,9 @@ Result> LeanStore::Open(StoreOption* option) { } Log::Init(option); - return std::make_unique(option); + auto store = std::make_unique(option); + + return store; } LeanStore::LeanStore(StoreOption* option) : mStoreOption(option) { @@ -61,6 +63,14 @@ LeanStore::LeanStore(StoreOption* option) : mStoreOption(option) { Log::Info("LeanStore starting ..."); SCOPED_DEFER(Log::Info("LeanStore started")); + // clean wal dir + if (mStoreOption->mCreateFromScratch) { + Log::Info("Clean wal dir: {}", getDbWalDir()); + std::filesystem::path walDirPath = getDbWalDir(); + std::filesystem::remove_all(walDirPath); + std::filesystem::create_directories(walDirPath); + } + initPageAndWalFd(); // create global btree catalog @@ -90,10 +100,7 @@ LeanStore::LeanStore(StoreOption* option) : mStoreOption(option) { } void LeanStore::initPageAndWalFd() { - SCOPED_DEFER({ - LS_DCHECK(fcntl(mPageFd, F_GETFL) != -1); - LS_DCHECK(fcntl(mWalFd, F_GETFL) != -1); - }); + SCOPED_DEFER(LS_DCHECK(fcntl(mPageFd, F_GETFL) != -1)); // Create a new instance on the specified DB file if (mStoreOption->mCreateFromScratch) { @@ -105,13 +112,6 @@ void LeanStore::initPageAndWalFd() { Log::Fatal("Could not open file at: {}", dbFilePath); } Log::Info("Init page fd succeed, pageFd={}, pageFile={}", mPageFd, dbFilePath); - - auto walFilePath = GetWalFilePath(); - mWalFd = open(walFilePath.c_str(), flags, 0666); - if (mWalFd == -1) { - Log::Fatal("Could not open file at: {}", walFilePath); - } - Log::Info("Init wal fd succeed, walFd={}, walFile={}", mWalFd, walFilePath); return; } @@ -127,15 +127,6 @@ void LeanStore::initPageAndWalFd() { dbFilePath); } Log::Info("Init page fd succeed, pageFd={}, pageFile={}", mPageFd, dbFilePath); - - auto walFilePath = GetWalFilePath(); - mWalFd = open(walFilePath.c_str(), flags, 0666); - if (mWalFd == -1) { - Log::Fatal("Recover failed, could not open file at: {}. The data is lost, " - "please create a new WAL file and start a new instance from it", - walFilePath); - } - Log::Info("Init wal fd succeed, walFd={}, walFile={}", mWalFd, walFilePath); } LeanStore::~LeanStore() { @@ -185,19 +176,6 @@ LeanStore::~LeanStore() { } else { Log::Info("Page file closed"); } - - { - auto walFilePath = GetWalFilePath(); - struct stat st; - if (stat(walFilePath.c_str(), &st) == 0) { - LS_DLOG("The size of {} is {} bytes", walFilePath, st.st_size); - } - } - if (close(mWalFd) == -1) { - perror("Failed to close WAL file: "); - } else { - Log::Info("WAL file closed"); - } } void LeanStore::ExecSync(uint64_t workerId, std::function job) { diff --git a/src/btree/BasicKV.cpp b/src/btree/BasicKV.cpp index 60473956..e6151862 100644 --- a/src/btree/BasicKV.cpp +++ b/src/btree/BasicKV.cpp @@ -6,6 +6,7 @@ #include "leanstore/btree/core/BTreeGeneric.hpp" #include "leanstore/btree/core/PessimisticExclusiveIterator.hpp" #include "leanstore/btree/core/PessimisticSharedIterator.hpp" +#include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/sync/HybridLatch.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/Misc.hpp" @@ -334,8 +335,9 @@ OpCode BasicKV::RangeRemove(Slice startKey, Slice endKey, bool pageWise) { if (guardedLeaf->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { xIter.SetCleanUpCallback([&, toMerge = guardedLeaf.mBf] { JUMPMU_TRY() { - TXID sysTxId = mStore->AllocSysTxTs(); + TXID sysTxId = cr::WorkerContext::My().StartSysTx(); this->TryMergeMayJump(sysTxId, *toMerge); + cr::WorkerContext::My().CommitSysTx(); } JUMPMU_CATCH() { } diff --git a/src/btree/TransactionKV.cpp b/src/btree/TransactionKV.cpp index 7c6c259c..8d613cd0 100644 --- a/src/btree/TransactionKV.cpp +++ b/src/btree/TransactionKV.cpp @@ -663,9 +663,6 @@ SpaceCheckResult TransactionKV::CheckSpaceUtilization(BufferFrame& bf) { } guardedNode.ToExclusiveMayJump(); - TXID sysTxId = utils::tlsStore->AllocSysTxTs(); - guardedNode.SyncSystemTxId(sysTxId); - for (uint16_t i = 0; i < guardedNode->mNumSlots; i++) { auto& tuple = *Tuple::From(guardedNode->ValData(i)); if (tuple.mFormat == TupleFormat::kFat) { diff --git a/src/btree/core/BTreeGeneric.cpp b/src/btree/core/BTreeGeneric.cpp index 5c6b6b97..9e18004d 100644 --- a/src/btree/core/BTreeGeneric.cpp +++ b/src/btree/core/BTreeGeneric.cpp @@ -9,6 +9,7 @@ #include "leanstore/buffer-manager/BufferFrame.hpp" #include "leanstore/buffer-manager/BufferManager.hpp" #include "leanstore/buffer-manager/GuardedBufferFrame.hpp" +#include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/utils/Defer.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/Misc.hpp" @@ -43,7 +44,8 @@ void BTreeGeneric::Init(leanstore::LeanStore* store, TREEID btreeId, BTreeConfig // Record WAL if (mConfig.mEnableWal) { - TXID sysTxId = mStore->AllocSysTxTs(); + TXID sysTxId = cr::WorkerContext::My().StartSysTx(); + SCOPED_DEFER(cr::WorkerContext::My().CommitSysTx()); auto rootWalHandler = xGuardedRoot.ReserveWALPayload(0, sysTxId, mTreeId, xGuardedRoot->mIsLeaf); @@ -476,8 +478,6 @@ BTreeGeneric::XMergeReturnCode BTreeGeneric::XMerge(GuardedBufferFrame xGuardedParent = std::move(guardedParent); // TODO(zz-jason): support wal and sync system tx id - // TXID sysTxId = utils::tlsStore->AllocSysTxTs(); - // xGuardedParent.SyncSystemTxId(sysTxId); XMergeReturnCode retCode = XMergeReturnCode::kPartialMerge; int16_t leftHand, rightHand, ret; @@ -498,8 +498,6 @@ BTreeGeneric::XMergeReturnCode BTreeGeneric::XMerge(GuardedBufferFrame xGuardedLeft(std::move(guardedNodes[leftHand - pos])); // TODO(zz-jason): support wal and sync system tx id - // xGuardedRight.SyncSystemTxId(sysTxId); - // xGuardedLeft.SyncSystemTxId(sysTxId); maxRight = leftHand; ret = mergeLeftIntoRight(xGuardedParent, leftHand, xGuardedLeft, xGuardedRight, leftHand == pos); diff --git a/src/buffer-manager/BufferManager.cpp b/src/buffer-manager/BufferManager.cpp index 75d82ac9..ea3ce7c6 100644 --- a/src/buffer-manager/BufferManager.cpp +++ b/src/buffer-manager/BufferManager.cpp @@ -5,7 +5,6 @@ #include "leanstore/buffer-manager/BufferFrame.hpp" #include "leanstore/buffer-manager/TreeRegistry.hpp" #include "leanstore/concurrency/CRManager.hpp" -#include "leanstore/concurrency/GroupCommitter.hpp" #include "leanstore/concurrency/Recovery.hpp" #include "leanstore/sync/HybridLatch.hpp" #include "leanstore/sync/ScopedHybridGuard.hpp" @@ -197,9 +196,10 @@ Result BufferManager::CheckpointBufferFrame(BufferFrame& bf) { } void BufferManager::RecoverFromDisk() { - auto recovery = std::make_unique( - mStore, 0, mStore->mCRManager->mGroupCommitter->mWalSize); - recovery->Run(); + // TODO: implement recovery + // auto recovery = std::make_unique( + // mStore, 0, mStore->mCRManager->mGroupCommitter->mWalSize); + // recovery->Run(); } uint64_t BufferManager::ConsumedPages() { diff --git a/src/concurrency/CRManager.cpp b/src/concurrency/CRManager.cpp index d1600b8e..aab31b40 100644 --- a/src/concurrency/CRManager.cpp +++ b/src/concurrency/CRManager.cpp @@ -2,7 +2,6 @@ #include "leanstore/LeanStore.hpp" #include "leanstore/btree/BasicKV.hpp" -#include "leanstore/concurrency/GroupCommitter.hpp" #include "leanstore/concurrency/HistoryStorage.hpp" #include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/concurrency/WorkerThread.hpp" @@ -13,7 +12,7 @@ namespace leanstore::cr { -CRManager::CRManager(leanstore::LeanStore* store) : mStore(store), mGroupCommitter(nullptr) { +CRManager::CRManager(leanstore::LeanStore* store) : mStore(store) { auto* storeOption = store->mStoreOption; // start all worker threads mWorkerCtxs.resize(storeOption->mWorkerThreads); @@ -32,13 +31,6 @@ CRManager::CRManager(leanstore::LeanStore* store) : mStore(store), mGroupCommitt mWorkerThreads.emplace_back(std::move(workerThread)); } - // start group commit thread - if (mStore->mStoreOption->mEnableWal) { - const int cpu = storeOption->mWorkerThreads; - mGroupCommitter = std::make_unique(mStore, mStore->mWalFd, mWorkerCtxs, cpu); - mGroupCommitter->Start(); - } - // create history storage for each worker // History tree should be created after worker thread and group committer are // started. @@ -49,8 +41,6 @@ CRManager::CRManager(leanstore::LeanStore* store) : mStore(store), mGroupCommitt } void CRManager::Stop() { - mGroupCommitter->Stop(); - for (auto& workerThread : mWorkerThreads) { workerThread->Stop(); } @@ -87,20 +77,17 @@ void CRManager::setupHistoryStorage4EachWorker() { } } -constexpr char kKeyWalSize[] = "wal_size"; constexpr char kKeyGlobalUsrTso[] = "global_user_tso"; constexpr char kKeyGlobalSysTso[] = "global_system_tso"; StringMap CRManager::Serialize() { StringMap map; - map[kKeyWalSize] = std::to_string(mGroupCommitter->mWalSize); map[kKeyGlobalUsrTso] = std::to_string(mStore->mUsrTso.load()); map[kKeyGlobalSysTso] = std::to_string(mStore->mSysTso.load()); return map; } void CRManager::Deserialize(StringMap map) { - mGroupCommitter->mWalSize = std::stoull(map[kKeyWalSize]); mStore->mUsrTso = std::stoull(map[kKeyGlobalUsrTso]); mStore->mSysTso = std::stoull(map[kKeyGlobalSysTso]); diff --git a/src/concurrency/GroupCommitter.cpp b/src/concurrency/GroupCommitter.cpp deleted file mode 100644 index 729671c8..00000000 --- a/src/concurrency/GroupCommitter.cpp +++ /dev/null @@ -1,185 +0,0 @@ -#include "leanstore/concurrency/GroupCommitter.hpp" - -#include "leanstore/concurrency/CRManager.hpp" -#include "leanstore/concurrency/WorkerContext.hpp" - -#include -#include -#include -#include -#include -#include - -namespace leanstore::cr { - -//! The alignment of the WAL record -constexpr size_t kAligment = 4096; - -void GroupCommitter::runImpl() { - TXID minFlushedSysTx = std::numeric_limits::max(); - TXID minFlushedUsrTx = std::numeric_limits::max(); - std::vector numRfaTxs(mWorkerCtxs.size(), 0); - std::vector walFlushReqCopies(mWorkerCtxs.size()); - - while (mKeepRunning) { - // phase 1 - collectWalRecords(minFlushedSysTx, minFlushedUsrTx, numRfaTxs, walFlushReqCopies); - - // phase 2 - if (!mAIo.IsEmpty()) { - flushWalRecords(); - } - - // phase 3 - determineCommitableTx(minFlushedSysTx, minFlushedUsrTx, numRfaTxs, walFlushReqCopies); - } -} - -void GroupCommitter::collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUsrTx, - std::vector& numRfaTxs, - std::vector& walFlushReqCopies) { - minFlushedSysTx = std::numeric_limits::max(); - minFlushedUsrTx = std::numeric_limits::max(); - - for (auto workerId = 0u; workerId < mWorkerCtxs.size(); workerId++) { - auto& logging = mWorkerCtxs[workerId]->mLogging; - - // collect logging info - std::unique_lock guard(logging.mRfaTxToCommitMutex); - numRfaTxs[workerId] = logging.mRfaTxToCommit.size(); - guard.unlock(); - - auto lastReqVersion = walFlushReqCopies[workerId].mVersion; - auto version = logging.mWalFlushReq.Get(walFlushReqCopies[workerId]); - walFlushReqCopies[workerId].mVersion = version; - const auto& reqCopy = walFlushReqCopies[workerId]; - - if (reqCopy.mVersion == lastReqVersion) { - // no transaction log write since last round group commit, skip. - continue; - } - - if (reqCopy.mSysTxWrittern > 0) { - minFlushedSysTx = std::min(minFlushedSysTx, reqCopy.mSysTxWrittern); - } - if (reqCopy.mCurrTxId > 0) { - minFlushedUsrTx = std::min(minFlushedUsrTx, reqCopy.mCurrTxId); - } - - // prepare IOCBs on demand - const uint64_t buffered = reqCopy.mWalBuffered; - const uint64_t flushed = logging.mWalFlushed; - const uint64_t bufferEnd = mStore->mStoreOption->mWalBufferSize; - if (buffered > flushed) { - append(logging.mWalBuffer, flushed, buffered); - } else if (buffered < flushed) { - append(logging.mWalBuffer, flushed, bufferEnd); - append(logging.mWalBuffer, 0, buffered); - } - } - - if (!mAIo.IsEmpty() && mStore->mStoreOption->mEnableWalFsync) { - mAIo.PrepareFsync(mWalFd); - } -} - -void GroupCommitter::flushWalRecords() { - // submit all log writes using a single system call. - if (auto res = mAIo.SubmitAll(); !res) { - Log::Error("Failed to submit all IO, error={}", res.error().ToString()); - } - - //! wait all to finish. - timespec timeout = {1, 0}; // 1s - if (auto res = mAIo.WaitAll(&timeout); !res) { - Log::Error("Failed to wait all IO, error={}", res.error().ToString()); - } - - //! sync the metadata in the end. - if (mStore->mStoreOption->mEnableWalFsync) { - auto failed = fdatasync(mWalFd); - if (failed) { - Log::Error("fdatasync failed, errno={}, error={}", errno, strerror(errno)); - } - } -} - -void GroupCommitter::determineCommitableTx(TXID minFlushedSysTx, TXID minFlushedUsrTx, - const std::vector& numRfaTxs, - const std::vector& walFlushReqCopies) { - for (WORKERID workerId = 0; workerId < mWorkerCtxs.size(); workerId++) { - auto& logging = mWorkerCtxs[workerId]->mLogging; - const auto& reqCopy = walFlushReqCopies[workerId]; - - // update the flushed commit TS info - logging.mWalFlushed.store(reqCopy.mWalBuffered, std::memory_order_release); - - // commit transactions with remote dependency - TXID maxCommitTs = 0; - { - std::unique_lock g(logging.mTxToCommitMutex); - uint64_t i = 0; - for (; i < logging.mTxToCommit.size(); ++i) { - auto& tx = logging.mTxToCommit[i]; - if (!tx.CanCommit(minFlushedSysTx, minFlushedUsrTx)) { - break; - } - maxCommitTs = std::max(maxCommitTs, tx.mCommitTs); - tx.mState = TxState::kCommitted; - LS_DLOG("Transaction with remote dependency committed, workerId={}, startTs={}, " - "commitTs={}, minFlushedSysTx={}, minFlushedUsrTx={}", - workerId, tx.mStartTs, tx.mCommitTs, minFlushedSysTx, minFlushedUsrTx); - } - if (i > 0) { - logging.mTxToCommit.erase(logging.mTxToCommit.begin(), logging.mTxToCommit.begin() + i); - } - } - - // commit transactions without remote dependency - TXID maxCommitTsRfa = 0; - { - std::unique_lock g(logging.mRfaTxToCommitMutex); - uint64_t i = 0; - for (; i < numRfaTxs[workerId]; ++i) { - auto& tx = logging.mRfaTxToCommit[i]; - maxCommitTsRfa = std::max(maxCommitTsRfa, tx.mCommitTs); - tx.mState = TxState::kCommitted; - LS_DLOG("Transaction without remote dependency committed, workerId={}, startTs={}, " - "commitTs={}", - workerId, tx.mStartTs, tx.mCommitTs); - } - if (i > 0) { - logging.mRfaTxToCommit.erase(logging.mRfaTxToCommit.begin(), - logging.mRfaTxToCommit.begin() + i); - } - } - - // Has committed transaction - TXID signaledUpTo = 0; - if (maxCommitTs == 0 && maxCommitTsRfa != 0) { - signaledUpTo = maxCommitTsRfa; - } else if (maxCommitTs != 0 && maxCommitTsRfa == 0) { - signaledUpTo = maxCommitTs; - } else if (maxCommitTs != 0 && maxCommitTsRfa != 0) { - signaledUpTo = std::min(maxCommitTs, maxCommitTsRfa); - } - if (signaledUpTo > 0) { - logging.UpdateSignaledCommitTs(signaledUpTo); - } - } - - mGlobalMinFlushedSysTx.store(minFlushedSysTx, std::memory_order_release); -} - -void GroupCommitter::append(uint8_t* buf, uint64_t lower, uint64_t upper) { - auto lowerAligned = utils::AlignDown(lower, kAligment); - auto upperAligned = utils::AlignUp(upper, kAligment); - auto* bufAligned = buf + lowerAligned; - auto countAligned = upperAligned - lowerAligned; - auto offsetAligned = utils::AlignDown(mWalSize, kAligment); - - mAIo.PrepareWrite(mWalFd, bufAligned, countAligned, offsetAligned); - mWalSize += upper - lower; -}; - -} // namespace leanstore::cr \ No newline at end of file diff --git a/src/concurrency/HistoryStorage.cpp b/src/concurrency/HistoryStorage.cpp index 5fbe09ed..9d6e1d0d 100644 --- a/src/concurrency/HistoryStorage.cpp +++ b/src/concurrency/HistoryStorage.cpp @@ -145,8 +145,9 @@ void HistoryStorage::PurgeVersions(TXID fromTxId, TXID toTxId, if (guardedLeaf->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { xIter.SetCleanUpCallback([&, toMerge = guardedLeaf.mBf] { JUMPMU_TRY() { - TXID sysTxId = btree->mStore->AllocSysTxTs(); - btree->TryMergeMayJump(sysTxId, *toMerge); + // wal logging is not supported for internal memory only history b+ tree, no need to + // start a sys tx + btree->TryMergeMayJump(0, *toMerge); } JUMPMU_CATCH() { } @@ -235,8 +236,9 @@ void HistoryStorage::PurgeVersions(TXID fromTxId, TXID toTxId, if (guardedLeaf->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { xIter.SetCleanUpCallback([&, toMerge = guardedLeaf.mBf] { JUMPMU_TRY() { - TXID sysTxId = btree->mStore->AllocSysTxTs(); - btree->TryMergeMayJump(sysTxId, *toMerge); + // wal logging is not supported for internal memory only history b+ tree, no need + // to start a sys tx + btree->TryMergeMayJump(0, *toMerge); } JUMPMU_CATCH() { } diff --git a/src/concurrency/Logging.cpp b/src/concurrency/Logging.cpp index 67e70fa0..dfc4e90e 100644 --- a/src/concurrency/Logging.cpp +++ b/src/concurrency/Logging.cpp @@ -1,6 +1,5 @@ #include "leanstore/concurrency/Logging.hpp" -#include "leanstore/Exceptions.hpp" #include "leanstore/concurrency/WalEntry.hpp" #include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/utils/Log.hpp" @@ -13,51 +12,14 @@ namespace leanstore::cr { -uint32_t Logging::walContiguousFreeSpace() { - const auto flushed = mWalFlushed.load(); - if (flushed <= mWalBuffered) { - return mWalBufferSize - mWalBuffered; - } - return flushed - mWalBuffered; -} - -void Logging::ReserveContiguousBuffer(uint32_t bytesRequired) { - // Spin until there is enough space. The wal ring buffer space is reclaimed - // when the group commit thread commits the written wal entries. - while (true) { - const auto flushed = mWalFlushed.load(); - if (flushed <= mWalBuffered) { - // carraige return, consume the last bytes from mWalBuffered to the end - if (mWalBufferSize - mWalBuffered < bytesRequired) { - WriteWalCarriageReturn(); - continue; - } - // Have enough space from mWalBuffered to the end - return; - } - - if (flushed - mWalBuffered < bytesRequired) { - // wait for group commit thread to commit the written wal entries - continue; - } - return; - } -} - void Logging::WriteWalTxAbort() { - // Reserve space - auto size = sizeof(WalTxAbort); - ReserveContiguousBuffer(size); - // Initialize a WalTxAbort - auto* data = mWalBuffer + mWalBuffered; + auto size = sizeof(WalTxAbort); + auto* data = mWalBuffer.Get(size); std::memset(data, 0, size); auto* entry [[maybe_unused]] = new (data) WalTxAbort(size); - // Submit the WalTxAbort to group committer - mWalBuffered += size; - publishWalFlushReq(); - + mWalBuffer.Advance(size); LS_DLOG("WriteWalTxAbort, workerId={}, startTs={}, walJson={}", WorkerContext::My().mWorkerId, WorkerContext::My().mActiveTx.mStartTs, utils::ToJsonString(entry)); } @@ -65,67 +27,13 @@ void Logging::WriteWalTxAbort() { void Logging::WriteWalTxFinish() { // Reserve space auto size = sizeof(WalTxFinish); - ReserveContiguousBuffer(size); - - // Initialize a WalTxFinish - auto* data = mWalBuffer + mWalBuffered; + auto* data = mWalBuffer.Get(size); std::memset(data, 0, size); auto* entry [[maybe_unused]] = new (data) WalTxFinish(WorkerContext::My().mActiveTx.mStartTs); - // Submit the WalTxAbort to group committer - mWalBuffered += size; - publishWalFlushReq(); - + mWalBuffer.Advance(size); LS_DLOG("WriteWalTxFinish, workerId={}, startTs={}, walJson={}", WorkerContext::My().mWorkerId, WorkerContext::My().mActiveTx.mStartTs, utils::ToJsonString(entry)); } -void Logging::WriteWalCarriageReturn() { - LS_DCHECK(mWalFlushed <= mWalBuffered, - "CarriageReturn should only used for the last bytes in the wal buffer"); - auto entrySize = mWalBufferSize - mWalBuffered; - auto* entryPtr = mWalBuffer + mWalBuffered; - new (entryPtr) WalCarriageReturn(entrySize); - mWalBuffered = 0; - publishWalBufferedOffset(); -} - -void Logging::SubmitWALEntryComplex(uint64_t totalSize) { - mActiveWALEntryComplex->mCrc32 = mActiveWALEntryComplex->ComputeCRC32(); - mWalBuffered += totalSize; - publishWalFlushReq(); - - LS_DLOG("SubmitWal, workerId={}, startTs={}, walJson={}", WorkerContext::My().mWorkerId, - WorkerContext::My().mActiveTx.mStartTs, utils::ToJsonString(mActiveWALEntryComplex)); -} - -void Logging::publishWalBufferedOffset() { - mWalFlushReq.UpdateAttribute(&WalFlushReq::mWalBuffered, mWalBuffered); -} - -void Logging::publishWalFlushReq() { - WalFlushReq current(mWalBuffered, mSysTxWrittern, WorkerContext::My().mActiveTx.mStartTs); - mWalFlushReq.Set(current); -} - -// Called by worker, so concurrent writes on the buffer -void Logging::IterateCurrentTxWALs(std::function callback) { - uint64_t cursor = mTxWalBegin; - while (cursor != mWalBuffered) { - const WalEntry& entry = *reinterpret_cast(mWalBuffer + cursor); - DEBUG_BLOCK() { - if (entry.mType == WalEntry::Type::kComplex) { - reinterpret_cast(&entry)->CheckCRC(); - } - } - - if (entry.mType == WalEntry::Type::kCarriageReturn) { - cursor = 0; - } else { - callback(entry); - cursor += WalEntry::Size(&entry); - } - } -} - } // namespace leanstore::cr diff --git a/src/concurrency/Recovery.cpp b/src/concurrency/Recovery.cpp index a5066fac..bf3ed7e7 100644 --- a/src/concurrency/Recovery.cpp +++ b/src/concurrency/Recovery.cpp @@ -8,6 +8,7 @@ #include "leanstore/concurrency/WalEntry.hpp" #include "leanstore/sync/HybridGuard.hpp" #include "leanstore/utils/Defer.hpp" +#include "leanstore/utils/Error.hpp" #include "leanstore/utils/Log.hpp" #include @@ -96,8 +97,8 @@ Result Recovery::analysis() { continue; } default: { - Log::Fatal("Unrecognized WalEntry type: {}, offset={}, walFd={}", - static_cast(walEntry->mType), startOffset, mStore->mWalFd); + Log::Fatal("Unrecognized WalEntry type: {}, offset={}", static_cast(walEntry->mType), + startOffset); } } } @@ -460,23 +461,27 @@ storage::BufferFrame& Recovery::resolvePage(PID pageId) { } // TODO(zz-jason): refactor with aio -Result Recovery::readFromWalFile(int64_t offset, size_t nbytes, void* destination) { - auto fileName = mStore->GetWalFilePath(); - FILE* fp = fopen(fileName.c_str(), "rb"); - if (fp == nullptr) { - return std::unexpected(utils::Error::FileOpen(fileName, errno, strerror(errno))); - } - SCOPED_DEFER(fclose(fp)); - - if (fseek(fp, offset, SEEK_SET) != 0) { - return std::unexpected(utils::Error::FileSeek(fileName, errno, strerror(errno))); - } - - if (fread(destination, 1, nbytes, fp) != nbytes) { - return std::unexpected(utils::Error::FileRead(fileName, errno, strerror(errno))); - } - - return {}; +Result Recovery::readFromWalFile(int64_t offset [[maybe_unused]], + size_t nbytes [[maybe_unused]], + void* destination [[maybe_unused]]) { + // auto fileName = mStore->GetWalFilePath(); + // FILE* fp = fopen(fileName.c_str(), "rb"); + // if (fp == nullptr) { + // return std::unexpected(utils::Error::FileOpen(fileName, errno, strerror(errno))); + // } + // SCOPED_DEFER(fclose(fp)); + + // if (fseek(fp, offset, SEEK_SET) != 0) { + // return std::unexpected(utils::Error::FileSeek(fileName, errno, strerror(errno))); + // } + + // if (fread(destination, 1, nbytes, fp) != nbytes) { + // return std::unexpected(utils::Error::FileRead(fileName, errno, strerror(errno))); + // } + + // return {}; + + return std::unexpected(utils::Error::General("Not implemented")); } } // namespace leanstore::cr \ No newline at end of file diff --git a/src/concurrency/WorkerContext.cpp b/src/concurrency/WorkerContext.cpp index 9f286c94..69603644 100644 --- a/src/concurrency/WorkerContext.cpp +++ b/src/concurrency/WorkerContext.cpp @@ -4,7 +4,6 @@ #include "leanstore/LeanStore.hpp" #include "leanstore/buffer-manager/TreeRegistry.hpp" #include "leanstore/concurrency/CRManager.hpp" -#include "leanstore/concurrency/GroupCommitter.hpp" #include "leanstore/concurrency/Logging.hpp" #include "leanstore/concurrency/Transaction.hpp" #include "leanstore/concurrency/WalEntry.hpp" @@ -14,7 +13,6 @@ #include #include -#include namespace leanstore::cr { @@ -25,34 +23,23 @@ thread_local PerfCounters tlsPerfCounters; WorkerContext::WorkerContext(uint64_t workerId, std::vector& allWorkers, leanstore::LeanStore* store) : mStore(store), + mLogging(store->mStoreOption->mWalBufferSize, store->GetDbWalPath(workerId), 16, + store->mStoreOption->mCreateFromScratch), mCc(store, allWorkers.size()), mActiveTxId(0), mWorkerId(workerId), mAllWorkers(allWorkers) { - - // init wal buffer - mLogging.mWalBufferSize = mStore->mStoreOption->mWalBufferSize; - mLogging.mWalBuffer = (uint8_t*)(std::aligned_alloc(512, mLogging.mWalBufferSize)); - std::memset(mLogging.mWalBuffer, 0, mLogging.mWalBufferSize); - mCc.mLcbCacheVal = std::make_unique(mAllWorkers.size()); mCc.mLcbCacheKey = std::make_unique(mAllWorkers.size()); } -WorkerContext::~WorkerContext() { - free(mLogging.mWalBuffer); - mLogging.mWalBuffer = nullptr; -} - void WorkerContext::StartTx(TxMode mode, IsolationLevel level, bool isReadOnly) { Transaction prevTx [[maybe_unused]] = mActiveTx; LS_DCHECK(prevTx.mState != TxState::kStarted, "Previous transaction not ended, workerId={}, startTs={}, txState={}", mWorkerId, prevTx.mStartTs, TxStatUtil::ToString(prevTx.mState)); - SCOPED_DEFER({ - LS_DLOG("Start transaction, workerId={}, startTs={}, globalMinFlushedSysTx={}", mWorkerId, - mActiveTx.mStartTs, mStore->mCRManager->mGroupCommitter->mGlobalMinFlushedSysTx.load()); - }); + SCOPED_DEFER( + LS_DLOG("Start transaction, workerId={}, startTs={}", mWorkerId, mActiveTx.mStartTs)); mActiveTx.Start(mode, level); @@ -61,10 +48,7 @@ void WorkerContext::StartTx(TxMode mode, IsolationLevel level, bool isReadOnly) } //! Reset the max observed system transaction id - mActiveTx.mMaxObservedSysTxId = mStore->mCRManager->mGroupCommitter->mGlobalMinFlushedSysTx; - - // Init wal and group commit related transaction information - mLogging.mTxWalBegin = mLogging.mWalBuffered; + mActiveTx.mDependentSysTx = Logging::sGlobalMinCommittedSysTx.load(); // For now, we only support SI and SSI if (level < IsolationLevel::kSnapshotIsolation) { @@ -102,55 +86,45 @@ void WorkerContext::CommitTx() { COUNTER_INC(&tlsPerfCounters.mTxWithoutRemoteDependencies); } mActiveTx.mState = TxState::kCommitted; + // Reset startTs so that other transactions can safely update the global + // transaction watermarks and garbage collect the unused versions. + mActiveTxId.store(0, std::memory_order_release); }); - if (!mActiveTx.mIsDurable) { - return; - } - - // Reset mCommandId on commit - mCommandId = 0; - if (mActiveTx.mHasWrote) { - mActiveTx.mCommitTs = mStore->AllocUsrTxTs(); - mCc.mCommitTree.AppendCommitLog(mActiveTx.mStartTs, mActiveTx.mCommitTs); - mCc.mLatestCommitTs.store(mActiveTx.mCommitTs, std::memory_order_release); - } else { + if (!mActiveTx.mHasWrote) { LS_DLOG("Transaction has no writes, skip assigning commitTs, append log to " "commit tree, and group commit, workerId={}, actual startTs={}", mWorkerId, mActiveTx.mStartTs); + return; } - // Reset startTs so that other transactions can safely update the global - // transaction watermarks and garbage collect the unused versions. - mActiveTxId.store(0, std::memory_order_release); - - if (!mActiveTx.mHasWrote) { + if (!mActiveTx.mIsDurable) { return; } - if (mActiveTx.mIsDurable) { - mLogging.WriteWalTxFinish(); - } + // Reset mCommandId on commit + mCommandId = 0; + mActiveTx.mCommitTs = mStore->AllocUsrTxTs(); + mCc.mCommitTree.AppendCommitLog(mActiveTx.mStartTs, mActiveTx.mCommitTs); + mCc.mLatestCommitTs.store(mActiveTx.mCommitTs, std::memory_order_release); - // for group commit - if (mActiveTx.mHasRemoteDependency) { - std::unique_lock g(mLogging.mTxToCommitMutex); - mLogging.mTxToCommit.push_back(mActiveTx); - } else { - std::unique_lock g(mLogging.mRfaTxToCommitMutex); - mLogging.mRfaTxToCommit.push_back(mActiveTx); - } + // write a finish wal record to indicate the end of current transaction + mLogging.WriteWalTxFinish(); + + // persist all WAL records of current transaction to disk + mLogging.mWalBuffer.Persist(); // Cleanup versions in history tree mCc.GarbageCollection(); // Wait logs to be flushed - LS_DLOG("Wait transaction to commit, workerId={}, startTs={}, commitTs={}, maxObseredSysTx={}, " + LS_DLOG("Wait transaction to commit, workerId={}, startTs={}, commitTs={}, dependentSysTx={}, " "hasRemoteDep={}", - mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedSysTxId, + mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mDependentSysTx, mActiveTx.mHasRemoteDependency); - - mLogging.WaitToCommit(mActiveTx.mCommitTs); + if (mActiveTx.mHasRemoteDependency) { + waitDependencyToCommit(); + } } //! TODO(jian.z): revert changes made in-place on the btree process of a transaction abort: @@ -171,18 +145,18 @@ void WorkerContext::AbortTx() { COUNTER_INC(&tlsPerfCounters.mTxWithoutRemoteDependencies); } mActiveTxId.store(0, std::memory_order_release); - Log::Info("Transaction aborted, workerId={}, startTs={}, commitTs={}, maxObservedSysTx={}", - mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedSysTxId); + Log::Info("Transaction aborted, workerId={}, startTs={}, commitTs={}, dependentSysTx={}", + mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mDependentSysTx); }); - if (!(mActiveTx.mState == TxState::kStarted && mActiveTx.mIsDurable)) { + if (mActiveTx.mState != TxState::kStarted || !mActiveTx.mIsDurable || !mActiveTx.mHasWrote) { return; } // TODO(jian.z): support reading from WAL file once LS_DCHECK(!mActiveTx.mWalExceedBuffer, "Aborting from WAL file is not supported yet"); std::vector entries; - mLogging.IterateCurrentTxWALs([&](const WalEntry& entry) { + mLogging.mWalBuffer.VisitWalRecordsDesc([&](const WalEntry& entry) { if (entry.mType == WalEntry::Type::kComplex) { entries.push_back(&entry); }