Skip to content

Commit

Permalink
chore: replace gsn with psn in wal
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason committed Dec 18, 2023
1 parent 9719928 commit b65f1fb
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 255 deletions.
9 changes: 6 additions & 3 deletions source/concurrency-recovery/HistoryTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ using BTreeLL = leanstore::storage::btree::BTreeLL;
class HistoryTree : public HistoryTreeInterface {
private:
struct alignas(64) Session {
BufferFrame *rightmost_bf, *leftmost_bf;
u64 rightmost_version, leftmost_version;
BufferFrame* rightmost_bf;
BufferFrame* leftmost_bf;
u64 rightmost_version;
u64 leftmost_version;
s64 rightmost_pos = -1;
TXID last_tx_id;
bool rightmost_init = false, leftmost_init = false;
bool rightmost_init = false;
bool leftmost_init = false;
};
Session update_sessions[leanstore::cr::STATIC_MAX_WORKERS];
Session remove_sessions[leanstore::cr::STATIC_MAX_WORKERS];
Expand Down
43 changes: 38 additions & 5 deletions source/concurrency-recovery/Recovery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "Units.hpp"
#include "storage/btree/BTreeLL.hpp"
#include "storage/btree/BTreeVI.hpp"
#include "storage/btree/core/BTreeExclusiveIterator.hpp"
#include "utils/Defer.hpp"

#include <glog/logging.h>
Expand Down Expand Up @@ -154,10 +155,10 @@ inline void Recovery::Analysis() {
mActiveTxTable[walEntry->mTxId] = offset;

auto& bf = ResolvePage(complexEntry->mPageId);
DCHECK(bf.header.mPageId == complexEntry->mPageId);
DCHECK(bf.page.mBTreeId == complexEntry->mTreeId);
// DCHECK(bf.header.mPageId == complexEntry->mPageId);
// DCHECK(bf.page.mBTreeId == complexEntry->mTreeId);

if (complexEntry->gsn > bf.page.mGSN &&
if (complexEntry->mPSN >= bf.page.mPSN &&
mDirtyPageTable.find(complexEntry->mPageId) ==
mDirtyPageTable.end()) {
// record the first WALEntry that makes the page dirty
Expand Down Expand Up @@ -195,17 +196,49 @@ inline void Recovery::Redo() {
auto complexEntry = reinterpret_cast<WALEntryComplex*>(walEntryPtr);
DCHECK(bytesRead == complexEntry->size);
if (mDirtyPageTable.find(complexEntry->mPageId) == mDirtyPageTable.end() ||
complexEntry->gsn < mDirtyPageTable[complexEntry->mPageId]) {
offset < mDirtyPageTable[complexEntry->mPageId]) {
offset += bytesRead;
continue;
}

// TODO(jian.z): redo previous operations on the page
auto& bf = ResolvePage(complexEntry->mPageId);
DCHECK(bf.header.mPageId == complexEntry->mPageId);
DCHECK(bf.page.mBTreeId == complexEntry->mTreeId);
// DCHECK(bf.page.mBTreeId == complexEntry->mTreeId);
SCOPED_DEFER(bf.header.mKeepInMemory = false);

auto walPayload = reinterpret_cast<leanstore::storage::btree::WALPayload*>(
complexEntry->payload);
switch (walPayload->type) {
case leanstore::storage::btree::WALPayload::TYPE::WALInsert: {
auto node = reinterpret_cast<leanstore::storage::btree::BTreeNode*>(
bf.page.mPayload);
auto walInsert =
dynamic_cast<leanstore::storage::btree::WALInsert*>(walPayload);
auto key = walInsert->GetKey();
auto val = walInsert->GetVal();
auto payloadSize =
val.size() + sizeof(leanstore::storage::btree::BTreeVI::ChainedTuple);
auto slotId = node->insertDoNotCopyPayload(key, payloadSize, -1);
auto payload = MutableSlice(node->ValData(slotId), node->ValSize(slotId));
// TODO(jian.z): store worker id in wal
// TODO(jian.z): store transaction start ts in wal transaction begin
auto& primaryVersion =
*new (payload.data())
leanstore::storage::btree::BTreeVI::ChainedTuple(
cr::Worker::my().mWorkerId, cr::activeTX().startTS());
std::memcpy(primaryVersion.payload, val.data(), val.size());

break;
// insert on the btree
// for convenience, use BTreeExclusiveIterator directly
}
default: {
DCHECK(false) << "Unhandled WALPayload::TYPE: "
<< std::to_string(static_cast<u64>(walPayload->type));
}
}

offset += bytesRead;
continue;
}
Expand Down
21 changes: 10 additions & 11 deletions source/concurrency-recovery/WALEntry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ class WALEntrySimple : public WALEntry {

class WALEntryComplex : public WALEntry {
public:
/// Global sequence number of the WALEntry, used for concurrency control and
/// transaction isolation. Also used to verify whether a page has been
/// modified since last checkpoint.
LID gsn;
/// Page sequence number of the WALEntry, indicate the page version this WAL
/// entry is based on.
LID mPSN;

/// The btree ID of the WALEntry, used to identify the btree node together
/// with page ID.
Expand All @@ -115,8 +114,8 @@ class WALEntryComplex : public WALEntry {
public:
WALEntryComplex() = default;

WALEntryComplex(LID lsn, u64 size, LID gsn, TREEID treeId, PID pageId)
: WALEntry(lsn, size, TYPE::COMPLEX), gsn(gsn), mTreeId(treeId),
WALEntryComplex(LID lsn, u64 size, LID psn, TREEID treeId, PID pageId)
: WALEntry(lsn, size, TYPE::COMPLEX), mPSN(psn), mTreeId(treeId),
mPageId(pageId) {
}

Expand Down Expand Up @@ -224,25 +223,25 @@ inline std::unique_ptr<rapidjson::Document> WALEntry::ToJSON() {
inline std::unique_ptr<rapidjson::Document> WALEntryComplex::ToJSON() {
auto doc = WALEntry::ToJSON();

// gsn
// psn
{
rapidjson::Value member;
member.SetUint64(gsn);
doc->AddMember("GSN", member, doc->GetAllocator());
member.SetUint64(mPSN);
doc->AddMember("mPSN", member, doc->GetAllocator());
}

// treeId
{
rapidjson::Value member;
member.SetInt64(mTreeId);
doc->AddMember("treeId", member, doc->GetAllocator());
doc->AddMember("mTreeId", member, doc->GetAllocator());
}

// pageId
{
rapidjson::Value member;
member.SetUint64(mPageId);
doc->AddMember("pageId", member, doc->GetAllocator());
doc->AddMember("mPageId", member, doc->GetAllocator());
}

return doc;
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency-recovery/Worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ inline Logging& Logging::other(WORKERID otherWorkerId) {

template <typename T, typename... Args>
inline WALPayloadHandler<T> Logging::ReserveWALEntryComplex(u64 payloadSize,
PID pageId, LID gsn,
PID pageId, LID psn,
TREEID treeId,
Args&&... args) {
SCOPED_DEFER(mPrevLSN = mActiveWALEntryComplex->lsn);
Expand All @@ -466,7 +466,7 @@ inline WALPayloadHandler<T> Logging::ReserveWALEntryComplex(u64 payloadSize,
DCHECK(walContiguousFreeSpace() >= entrySize);

mActiveWALEntryComplex =
new (entryPtr) WALEntryComplex(entryLSN, entrySize, gsn, treeId, pageId);
new (entryPtr) WALEntryComplex(entryLSN, entrySize, psn, treeId, pageId);
mActiveWALEntryComplex->mPrevLSN = mPrevLSN;
mActiveWALEntryComplex->mTxId =
leanstore::cr::Worker::my().mActiveTx.mStartTs;
Expand Down
7 changes: 4 additions & 3 deletions source/storage/btree/BTreeVI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ OP_RESULT BTreeVI::insert(Slice key, Slice val) {

cr::activeTX().markAsWrite();
cr::Worker::my().mLogging.walEnsureEnoughSpace(PAGE_SIZE * 1);
u16 payload_length = val.size() + sizeof(ChainedTuple);
u16 payloadSize = val.size() + sizeof(ChainedTuple);

while (true) {
JUMPMU_TRY() {
Expand All @@ -409,7 +409,7 @@ OP_RESULT BTreeVI::insert(Slice key, Slice val) {
// Not implemented: maybe it has been removed but no GCed
}

ret = iterator.enoughSpaceInCurrentNode(key, payload_length);
ret = iterator.enoughSpaceInCurrentNode(key, payloadSize);
if (ret == OP_RESULT::NOT_ENOUGH_SPACE) {
iterator.splitForKey(key);
JUMPMU_CONTINUE;
Expand All @@ -420,7 +420,8 @@ OP_RESULT BTreeVI::insert(Slice key, Slice val) {
key.size() + val.size(), key, val);
walHandler.SubmitWal();

iterator.insertInCurrentNode(key, payload_length);
// insert
iterator.insertInCurrentNode(key, payloadSize);
MutableSlice payload = iterator.mutableValue();
auto& primaryVersion = *new (payload.data()) ChainedTuple(
cr::Worker::my().mWorkerId, cr::activeTX().startTS());
Expand Down
Loading

0 comments on commit b65f1fb

Please sign in to comment.