Skip to content

Commit

Permalink
feat: support wal for btree node creation
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason committed Dec 19, 2023
1 parent c378afa commit 1dbda17
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 69 deletions.
4 changes: 4 additions & 0 deletions source/LeanStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class LeanStore {
[[nodiscard]] bool RegisterBTreeLL(
const std::string& name, storage::btree::BTreeGeneric::Config& config,
storage::btree::BTreeLL** btree) {
DCHECK(cr::Worker::my().IsTxStarted());
// create btree for graveyard
*btree = storage::btree::BTreeLL::Create(name, config);
return (*btree) != nullptr;
Expand All @@ -73,6 +74,7 @@ class LeanStore {
/// Unregister a BTreeLL
/// @param name The unique name of the btree
void UnRegisterBTreeLL(const std::string& name) {
DCHECK(cr::Worker::my().IsTxStarted());
auto btree = dynamic_cast<storage::btree::BTreeGeneric*>(
storage::TreeRegistry::sInstance->GetTree(name));
leanstore::storage::btree::BTreeGeneric::FreeAndReclaim(*btree);
Expand All @@ -88,6 +90,7 @@ class LeanStore {
[[nodiscard]] bool RegisterBTreeVI(
const std::string& name, storage::btree::BTreeGeneric::Config& config,
storage::btree::BTreeVI** btree) {
DCHECK(cr::Worker::my().IsTxStarted());
bool success(false);
*btree = nullptr;

Expand Down Expand Up @@ -130,6 +133,7 @@ class LeanStore {
/// Unregister a BTreeVI
/// @param name The unique name of the btree
void UnRegisterBTreeVI(const std::string& name) {
DCHECK(cr::Worker::my().IsTxStarted());
auto btree = dynamic_cast<storage::btree::BTreeGeneric*>(
storage::TreeRegistry::sInstance->GetTree(name));
leanstore::storage::btree::BTreeGeneric::FreeAndReclaim(*btree);
Expand Down
11 changes: 9 additions & 2 deletions source/concurrency-recovery/Recovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,16 @@ void Recovery::Redo() {

auto walPayload = reinterpret_cast<WALPayload*>(complexEntry->payload);
switch (walPayload->type) {
case WALPayload::TYPE::WALInitPage: {
auto walInitPage = reinterpret_cast<WALInitPage*>(complexEntry->payload);
HybridGuard guard(&bf.header.mLatch);
GuardedBufferFrame<BTreeNode> guardedNode(std::move(guard), &bf);
guardedNode->mIsLeaf = walInitPage->mIsLeaf;
bf.page.mBTreeId = complexEntry->mTreeId;
break;
}
case WALPayload::TYPE::WALInsert: {
auto walInsert = dynamic_cast<WALInsert*>(walPayload);

auto walInsert = reinterpret_cast<WALInsert*>(complexEntry->payload);
HybridGuard guard(&bf.header.mLatch);
GuardedBufferFrame<BTreeNode> guardedNode(std::move(guard), &bf);

Expand Down
2 changes: 2 additions & 0 deletions source/storage/btree/BTreeLL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace btree {
#define ARRAY_ON_STACK(varName, T, N) T* varName = (T*)alloca((N) * sizeof(T));

OP_RESULT BTreeLL::Lookup(Slice key, ValCallback valCallback) {
DCHECK(cr::Worker::my().IsTxStarted());
while (true) {
JUMPMU_TRY() {
GuardedBufferFrame<BTreeNode> leaf;
Expand Down Expand Up @@ -140,6 +141,7 @@ OP_RESULT BTreeLL::scanDesc(Slice scanKey, ScanCallback callback) {
}

OP_RESULT BTreeLL::insert(Slice key, Slice val) {
DCHECK(cr::Worker::my().IsTxStarted());
cr::activeTX().markAsWrite();
if (config.mEnableWal) {
cr::Worker::my().mLogging.walEnsureEnoughSpace(PAGE_SIZE * 1);
Expand Down
27 changes: 2 additions & 25 deletions source/storage/btree/BTreeVI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,7 @@ namespace storage {
namespace btree {

OP_RESULT BTreeVI::Lookup(Slice key, ValCallback valCallback) {
auto autoCommit(false);
if (!cr::Worker::my().IsTxStarted()) {
DLOG(INFO) << "Start implicit transaction";
cr::Worker::my().startTX();
autoCommit = true;
}
SCOPED_DEFER({
// auto-commit the implicit transaction
if (autoCommit) {
cr::Worker::my().commitTX();
}
});
DCHECK(cr::Worker::my().IsTxStarted());

OP_RESULT ret = lookupOptimistic(key, valCallback);
if (ret == OP_RESULT::OTHER) {
Expand Down Expand Up @@ -364,20 +353,8 @@ OP_RESULT BTreeVI::updateSameSizeInPlace(
}

OP_RESULT BTreeVI::insert(Slice key, Slice val) {
// check implicit transaction
// TODO(jian.z): should we force users to explicitly start a transaction?
auto autoCommit(false);
if (!cr::Worker::my().IsTxStarted()) {
DLOG(INFO) << "Start implicit transaction";
cr::Worker::my().startTX();
autoCommit = true;
}

DCHECK(cr::Worker::my().IsTxStarted());
SCOPED_DEFER({
// auto-commit the implicit transaction
if (autoCommit) {
cr::Worker::my().commitTX();
}
rapidjson::Document doc(rapidjson::kObjectType);
BTreeGeneric::ToJSON(*this, &doc);
DLOG(INFO) << "BTreeVI after insert: " << leanstore::utils::JsonToStr(&doc);
Expand Down
43 changes: 26 additions & 17 deletions source/storage/btree/core/BTreeGeneric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,28 @@ void BTreeGeneric::Init(TREEID btreeId, Config config) {
guard.unlock();

auto guardedRoot = GuardedBufferFrame<BTreeNode>(btreeId);
auto exclusiveGuardedRoot =
auto xGuardedRoot =
ExclusiveGuardedBufferFrame<BTreeNode>(std::move(guardedRoot));
exclusiveGuardedRoot.InitPayload(true);
xGuardedRoot.InitPayload(true);

GuardedBufferFrame<BTreeNode> guardedMeta(mMetaNodeSwip);
ExclusiveGuardedBufferFrame exclusiveGuardedMeta(std::move(guardedMeta));
exclusiveGuardedMeta->mIsLeaf = false;
// HACK: use upper of meta node as a swip to the storage root
exclusiveGuardedMeta->mRightMostChildSwip = exclusiveGuardedRoot.bf();

// TODO: write WALs
exclusiveGuardedRoot.IncPageGSN();
exclusiveGuardedMeta.IncPageGSN();
ExclusiveGuardedBufferFrame xGuardedMeta(std::move(guardedMeta));
xGuardedMeta->mIsLeaf = false;
xGuardedMeta->mRightMostChildSwip = xGuardedRoot.bf();

// Record WAL
if (config.mEnableWal) {
auto rootWalHandler = xGuardedRoot.ReserveWALPayload<WALInitPage>(
0, mTreeId, xGuardedRoot->mIsLeaf);
rootWalHandler.SubmitWal();

auto metaWalHandler = xGuardedMeta.ReserveWALPayload<WALInitPage>(
0, mTreeId, xGuardedMeta->mIsLeaf);
metaWalHandler.SubmitWal();
}

xGuardedRoot.IncPageGSN();
xGuardedMeta.IncPageGSN();
}

void BTreeGeneric::trySplit(BufferFrame& toSplit, s16 favoredSplitPos) {
Expand Down Expand Up @@ -113,11 +122,11 @@ void BTreeGeneric::trySplit(BufferFrame& toSplit, s16 favoredSplitPos) {
};
if (config.mEnableWal) {
auto newRootWalHandler =
xGuardedNewRoot.ReserveWALPayload<WALInitPage>(0, mTreeId);
xGuardedNewRoot.ReserveWALPayload<WALInitPage>(0, mTreeId, false);
newRootWalHandler.SubmitWal();

auto newLeftWalHandler =
xGuardedNewLeft.ReserveWALPayload<WALInitPage>(0, mTreeId);
auto newLeftWalHandler = xGuardedNewLeft.ReserveWALPayload<WALInitPage>(
0, mTreeId, xGuardedChild->mIsLeaf);
newLeftWalHandler.SubmitWal();

auto parentPageId(xGuardedNewRoot.bf()->header.mPageId);
Expand Down Expand Up @@ -185,8 +194,8 @@ void BTreeGeneric::trySplit(BufferFrame& toSplit, s16 favoredSplitPos) {
};

if (config.mEnableWal) {
auto newLeftWalHandler =
xGuardedNewLeft.ReserveWALPayload<WALInitPage>(0, mTreeId);
auto newLeftWalHandler = xGuardedNewLeft.ReserveWALPayload<WALInitPage>(
0, mTreeId, xGuardedNewLeft->mIsLeaf);
newLeftWalHandler.SubmitWal();

auto parentPageId = xGuardedParent.bf()->header.mPageId;
Expand All @@ -205,8 +214,8 @@ void BTreeGeneric::trySplit(BufferFrame& toSplit, s16 favoredSplitPos) {
0, parentPageId, lhsPageId, rhsPageId);
parentWalHandler.SubmitWal();

newLeftWalHandler =
xGuardedNewLeft.ReserveWALPayload<WALInitPage>(0, mTreeId);
newLeftWalHandler = xGuardedNewLeft.ReserveWALPayload<WALInitPage>(
0, mTreeId, xGuardedNewLeft->mIsLeaf);
newLeftWalHandler.SubmitWal();

auto leftWalHandler =
Expand Down
10 changes: 5 additions & 5 deletions source/storage/btree/core/BTreeNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ u16 BTreeNode::spaceNeeded(u16 keySize, u16 valSize) {
}

bool BTreeNode::canInsert(u16 keySize, u16 valSize) {
const u16 space_needed = spaceNeeded(keySize, valSize);
if (!hasEnoughSpaceFor(space_needed))
const u16 numSpaceNeeded = spaceNeeded(keySize, valSize);
if (!hasEnoughSpaceFor(numSpaceNeeded))
return false; // no space, insert fails
else
return true;
}

bool BTreeNode::prepareInsert(u16 keySize, u16 valSize) {
const u16 space_needed = spaceNeeded(keySize, valSize);
if (!requestSpaceFor(space_needed))
return false; // no space, insert fails
const u16 numSpaceNeeded = spaceNeeded(keySize, valSize);
if (!requestSpaceFor(numSpaceNeeded))
return false;
else
return true;
}
Expand Down
9 changes: 7 additions & 2 deletions source/storage/btree/core/BTreeWALPayload.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ class WALPayload {
#undef TYPE_NAME
#undef DECR_TYPE

struct WALInitPage : WALPayload {
class WALInitPage : WALPayload {
public:
TREEID mTreeId;
bool mIsLeaf;

WALInitPage(TREEID treeId) : WALPayload(TYPE::WALInitPage), mTreeId(treeId) {
public:
WALInitPage(TREEID treeId, bool isLeaf)
: WALPayload(TYPE::WALInitPage), mTreeId(treeId), mIsLeaf(isLeaf) {
}

public:
virtual std::unique_ptr<rapidjson::Document> ToJSON() override {
auto doc = WALPayload::ToJSON();

Expand Down
16 changes: 16 additions & 0 deletions tests/BTreeLLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,33 @@ TEST_F(BTreeLLTest, BTreeLLCreate) {
};

cr::CRManager::sInstance->scheduleJobSync(0, [&]() {
cr::Worker::my().startTX();
SCOPED_DEFER(cr::Worker::my().commitTX());
EXPECT_TRUE(mLeanStore->RegisterBTreeLL(btreeName, btreeConfig, &btree));
EXPECT_NE(btree, nullptr);
});

// create btree with same should fail in the same worker
cr::CRManager::sInstance->scheduleJobSync(0, [&]() {
cr::Worker::my().startTX();
SCOPED_DEFER(cr::Worker::my().commitTX());
EXPECT_FALSE(mLeanStore->RegisterBTreeLL(btreeName, btreeConfig, &another));
EXPECT_EQ(another, nullptr);
});

// create btree with same should also fail in other workers
cr::CRManager::sInstance->scheduleJobSync(1, [&]() {
cr::Worker::my().startTX();
SCOPED_DEFER(cr::Worker::my().commitTX());
EXPECT_FALSE(mLeanStore->RegisterBTreeLL(btreeName, btreeConfig, &another));
EXPECT_EQ(another, nullptr);
});

// create btree with another different name should success
btreeName = "testTree2";
cr::CRManager::sInstance->scheduleJobSync(0, [&]() {
cr::Worker::my().startTX();
SCOPED_DEFER(cr::Worker::my().commitTX());
EXPECT_TRUE(mLeanStore->RegisterBTreeLL(btreeName, btreeConfig, &another));
EXPECT_NE(btree, nullptr);
});
Expand Down Expand Up @@ -95,20 +103,26 @@ TEST_F(BTreeLLTest, BTreeLLInsertAndLookup) {
.mUseBulkInsert = FLAGS_bulk_insert,
};
cr::CRManager::sInstance->scheduleJobSync(0, [&]() {
cr::Worker::my().startTX();
EXPECT_TRUE(mLeanStore->RegisterBTreeLL(btreeName, btreeConfig, &btree));
EXPECT_NE(btree, nullptr);
cr::Worker::my().commitTX();

// insert some values
cr::Worker::my().startTX();
for (size_t i = 0; i < numKVs; ++i) {
const auto& [key, val] = kvToTest[i];
EXPECT_EQ(btree->insert(Slice((const u8*)key.data(), key.size()),
Slice((const u8*)val.data(), val.size())),
OP_RESULT::OK);
}
cr::Worker::my().commitTX();
});

// query on the created btree in the same worker
cr::CRManager::sInstance->scheduleJobSync(0, [&]() {
cr::Worker::my().startTX();
SCOPED_DEFER(cr::Worker::my().commitTX());
std::string copiedValue;
auto copyValueOut = [&](Slice val) {
copiedValue = std::string((const char*)val.data(), val.size());
Expand All @@ -124,6 +138,8 @@ TEST_F(BTreeLLTest, BTreeLLInsertAndLookup) {

// query on the created btree in another worker
cr::CRManager::sInstance->scheduleJobSync(1, [&]() {
cr::Worker::my().startTX();
SCOPED_DEFER(cr::Worker::my().commitTX());
std::string copiedValue;
auto copyValueOut = [&](Slice val) {
copiedValue = std::string((const char*)val.data(), val.size());
Expand Down
Loading

0 comments on commit 1dbda17

Please sign in to comment.