Skip to content

Commit 1fff020

Browse files
committed
feat: support conditionally visit all buffer frames
1 parent 3ce8682 commit 1fff020

File tree

5 files changed

+147
-126
lines changed

5 files changed

+147
-126
lines changed

source/LeanStore.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,10 @@ class LeanStore {
138138
auto graveyardName = "_" + name + "_graveyard";
139139
btree = dynamic_cast<storage::btree::BTreeGeneric*>(
140140
storage::TreeRegistry::sInstance->GetTree(graveyardName));
141-
leanstore::storage::btree::BTreeGeneric::FreeAndReclaim(*btree);
142-
storage::TreeRegistry::sInstance->UnregisterTree(graveyardName);
141+
if (btree != nullptr) {
142+
leanstore::storage::btree::BTreeGeneric::FreeAndReclaim(*btree);
143+
storage::TreeRegistry::sInstance->UnregisterTree(graveyardName);
144+
}
143145
}
144146

145147
private:

source/storage/btree/BTreeVI.hpp

-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// BTreeVI and BTreeVW are work in progress!
21
#pragma once
32

43
#include "BTreeLL.hpp"
@@ -12,8 +11,6 @@
1211

1312
#include <set>
1413

15-
using namespace leanstore::storage;
16-
1714
namespace leanstore {
1815
namespace storage {
1916
namespace btree {
+98-92
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,95 @@
11
#pragma once
2+
23
#include "Units.hpp"
3-
// -------------------------------------------------------------------------------------
4-
// -------------------------------------------------------------------------------------
4+
55
#include <cstring>
6-
// -------------------------------------------------------------------------------------
6+
77
// TODO: Works only for update same size
88
#define DELTA_COPY
99
#ifdef DELTA_XOR
1010
// Obsolete
11-
#define beforeBody(Type, Attribute, tuple, entry) \
12-
const auto Attribute##_offset = offsetof(Type, Attribute); \
13-
const auto Attribute##_size = sizeof(Type::Attribute); \
14-
*reinterpret_cast<u16*>(entry) = Attribute##_offset; \
15-
entry += sizeof(u16); \
16-
*reinterpret_cast<u16*>(entry) = Attribute##_size; \
17-
entry += sizeof(u16); \
18-
std::memcpy(entry, tuple + Attribute##_offset, Attribute##_size); \
19-
entry += Attribute##_size;
11+
#define beforeBody(Type, Attribute, tuple, entry) \
12+
const auto Attribute##_offset = offsetof(Type, Attribute); \
13+
const auto Attribute##_size = sizeof(Type::Attribute); \
14+
*reinterpret_cast<u16*>(entry) = Attribute##_offset; \
15+
entry += sizeof(u16); \
16+
*reinterpret_cast<u16*>(entry) = Attribute##_size; \
17+
entry += sizeof(u16); \
18+
std::memcpy(entry, tuple + Attribute##_offset, Attribute##_size); \
19+
entry += Attribute##_size;
2020

21-
#define afterBody(Type, Attribute, tuple, entry) \
22-
const auto Attribute##_offset = offsetof(Type, Attribute); \
23-
const auto Attribute##_size = sizeof(Type::Attribute); \
24-
entry += (sizeof(u16) * 2); \
25-
for (u64 b_i = 0; b_i < Attribute##_size; b_i++) { \
26-
*(entry + b_i) ^= *(tuple + Attribute##_offset + b_i); \
27-
} \
28-
entry += Attribute##_size;
21+
#define afterBody(Type, Attribute, tuple, entry) \
22+
const auto Attribute##_offset = offsetof(Type, Attribute); \
23+
const auto Attribute##_size = sizeof(Type::Attribute); \
24+
entry += (sizeof(u16) * 2); \
25+
for (u64 b_i = 0; b_i < Attribute##_size; b_i++) { \
26+
*(entry + b_i) ^= *(tuple + Attribute##_offset + b_i); \
27+
} \
28+
entry += Attribute##_size;
2929
#endif
3030
#ifdef DELTA_COPY
31-
#define beforeBody(Type, Attribute, tuple, entry) \
32-
const auto Attribute##_offset = offsetof(Type, Attribute); \
33-
const auto Attribute##_size = sizeof(Type::Attribute); \
34-
*reinterpret_cast<u16*>(entry) = Attribute##_offset; \
35-
entry += sizeof(u16); \
36-
*reinterpret_cast<u16*>(entry) = Attribute##_size; \
37-
entry += sizeof(u16); \
38-
std::memcpy(entry, tuple + Attribute##_offset, Attribute##_size); \
39-
entry += 2 * Attribute##_size;
31+
#define beforeBody(Type, Attribute, tuple, entry) \
32+
const auto Attribute##_offset = offsetof(Type, Attribute); \
33+
const auto Attribute##_size = sizeof(Type::Attribute); \
34+
*reinterpret_cast<u16*>(entry) = Attribute##_offset; \
35+
entry += sizeof(u16); \
36+
*reinterpret_cast<u16*>(entry) = Attribute##_size; \
37+
entry += sizeof(u16); \
38+
std::memcpy(entry, tuple + Attribute##_offset, Attribute##_size); \
39+
entry += 2 * Attribute##_size;
4040

41-
#define afterBody(Type, Attribute, tuple, entry) \
42-
const auto Attribute##_offset = offsetof(Type, Attribute); \
43-
const auto Attribute##_size = sizeof(Type::Attribute); \
44-
entry += (sizeof(u16) * 2); \
45-
entry += Attribute##_size; \
46-
std::memcpy(entry, tuple + Attribute##_offset, Attribute##_size); \
47-
entry += 1 * Attribute##_size;
41+
#define afterBody(Type, Attribute, tuple, entry) \
42+
const auto Attribute##_offset = offsetof(Type, Attribute); \
43+
const auto Attribute##_size = sizeof(Type::Attribute); \
44+
entry += (sizeof(u16) * 2); \
45+
entry += Attribute##_size; \
46+
std::memcpy(entry, tuple + Attribute##_offset, Attribute##_size); \
47+
entry += 1 * Attribute##_size;
4848
#endif
4949

50-
#define beforeWrapper1(Type, A1) [](u8* tuple, u8* entry) { beforeBody(Type, A1, tuple, entry); }
51-
#define beforeWrapper2(Type, A1, A2) \
52-
[](u8* tuple, u8* entry) { \
53-
beforeBody(Type, A1, tuple, entry); \
54-
beforeBody(Type, A2, tuple, entry); \
55-
}
56-
#define beforeWrapper3(Type, A1, A2, A3) \
57-
[](u8* tuple, u8* entry) { \
58-
beforeBody(Type, A1, tuple, entry); \
59-
beforeBody(Type, A2, tuple, entry); \
60-
beforeBody(Type, A3, tuple, entry); \
61-
}
62-
#define beforeWrapper4(Type, A1, A2, A3, A4) \
63-
[](u8* tuple, u8* entry) { \
64-
beforeBody(Type, A1, tuple, entry); \
65-
beforeBody(Type, A2, tuple, entry); \
66-
beforeBody(Type, A3, tuple, entry); \
67-
beforeBody(Type, A4, tuple, entry); \
68-
}
50+
#define beforeWrapper1(Type, A1) \
51+
[](u8* tuple, u8* entry) { beforeBody(Type, A1, tuple, entry); }
52+
#define beforeWrapper2(Type, A1, A2) \
53+
[](u8* tuple, u8* entry) { \
54+
beforeBody(Type, A1, tuple, entry); \
55+
beforeBody(Type, A2, tuple, entry); \
56+
}
57+
#define beforeWrapper3(Type, A1, A2, A3) \
58+
[](u8* tuple, u8* entry) { \
59+
beforeBody(Type, A1, tuple, entry); \
60+
beforeBody(Type, A2, tuple, entry); \
61+
beforeBody(Type, A3, tuple, entry); \
62+
}
63+
#define beforeWrapper4(Type, A1, A2, A3, A4) \
64+
[](u8* tuple, u8* entry) { \
65+
beforeBody(Type, A1, tuple, entry); \
66+
beforeBody(Type, A2, tuple, entry); \
67+
beforeBody(Type, A3, tuple, entry); \
68+
beforeBody(Type, A4, tuple, entry); \
69+
}
6970

70-
#define afterWrapper1(Type, A1) [](u8* tuple, u8* entry) { afterBody(Type, A1, tuple, entry); }
71-
#define afterWrapper2(Type, A1, A2) \
72-
[](u8* tuple, u8* entry) { \
73-
afterBody(Type, A1, tuple, entry); \
74-
afterBody(Type, A2, tuple, entry); \
75-
}
71+
#define afterWrapper1(Type, A1) \
72+
[](u8* tuple, u8* entry) { afterBody(Type, A1, tuple, entry); }
73+
#define afterWrapper2(Type, A1, A2) \
74+
[](u8* tuple, u8* entry) { \
75+
afterBody(Type, A1, tuple, entry); \
76+
afterBody(Type, A2, tuple, entry); \
77+
}
7678

77-
#define afterWrapper3(Type, A1, A2, A3) \
78-
[](u8* tuple, u8* entry) { \
79-
afterBody(Type, A1, tuple, entry); \
80-
afterBody(Type, A2, tuple, entry); \
81-
afterBody(Type, A3, tuple, entry); \
82-
}
79+
#define afterWrapper3(Type, A1, A2, A3) \
80+
[](u8* tuple, u8* entry) { \
81+
afterBody(Type, A1, tuple, entry); \
82+
afterBody(Type, A2, tuple, entry); \
83+
afterBody(Type, A3, tuple, entry); \
84+
}
8385

84-
#define afterWrapper4(Type, A1, A2, A3, A4) \
85-
[](u8* tuple, u8* entry) { \
86-
afterBody(Type, A1, tuple, entry); \
87-
afterBody(Type, A2, tuple, entry); \
88-
afterBody(Type, A3, tuple, entry); \
89-
afterBody(Type, A4, tuple, entry); \
90-
}
86+
#define afterWrapper4(Type, A1, A2, A3, A4) \
87+
[](u8* tuple, u8* entry) { \
88+
afterBody(Type, A1, tuple, entry); \
89+
afterBody(Type, A2, tuple, entry); \
90+
afterBody(Type, A3, tuple, entry); \
91+
afterBody(Type, A4, tuple, entry); \
92+
}
9193

9294
#ifdef DELTA_XOR
9395
#define entrySize1(Type, A1) ((2 * sizeof(u16)) + (1 * sizeof(Type::A1)))
@@ -96,22 +98,26 @@
9698
#define entrySize1(Type, A1) ((2 * sizeof(u16)) + (2 * sizeof(Type::A1)))
9799
#endif
98100
#define entrySize2(Type, A1, A2) entrySize1(Type, A1) + entrySize1(Type, A2)
99-
#define entrySize3(Type, A1, A2, A3) entrySize1(Type, A1) + entrySize1(Type, A2) + entrySize1(Type, A3)
100-
#define entrySize4(Type, A1, A2, A3, A4) entrySize1(Type, A1) + entrySize1(Type, A2) + entrySize1(Type, A3) + entrySize1(Type, A4)
101+
#define entrySize3(Type, A1, A2, A3) \
102+
entrySize1(Type, A1) + entrySize1(Type, A2) + entrySize1(Type, A3)
103+
#define entrySize4(Type, A1, A2, A3, A4) \
104+
entrySize1(Type, A1) + entrySize1(Type, A2) + entrySize1(Type, A3) + \
105+
entrySize1(Type, A4)
101106

102-
#define WALUpdate1(Type, A1) \
103-
{ \
104-
beforeWrapper1(Type, A1), afterWrapper1(Type, A1), entrySize1(Type, A1) \
105-
}
106-
#define WALUpdate2(Type, A1, A2) \
107-
{ \
108-
beforeWrapper2(Type, A1, A2), afterWrapper2(Type, A1, A2), entrySize2(Type, A1, A2) \
109-
}
110-
#define WALUpdate3(Type, A1, A2, A3) \
111-
{ \
112-
beforeWrapper3(Type, A1, A2, A3), afterWrapper3(Type, A1, A2, A3), entrySize3(Type, A1, A2, A3) \
113-
}
114-
#define WALUpdate4(Type, A1, A2, A3, A4) \
115-
{ \
116-
beforeWrapper4(Type, A1, A2, A3, A4), afterWrapper4(Type, A1, A2, A3, A4), entrySize4(Type, A1, A2, A3, A4) \
117-
}
107+
#define WALUpdate1(Type, A1) \
108+
{ beforeWrapper1(Type, A1), afterWrapper1(Type, A1), entrySize1(Type, A1) }
109+
#define WALUpdate2(Type, A1, A2) \
110+
{ \
111+
beforeWrapper2(Type, A1, A2), afterWrapper2(Type, A1, A2), \
112+
entrySize2(Type, A1, A2) \
113+
}
114+
#define WALUpdate3(Type, A1, A2, A3) \
115+
{ \
116+
beforeWrapper3(Type, A1, A2, A3), afterWrapper3(Type, A1, A2, A3), \
117+
entrySize3(Type, A1, A2, A3) \
118+
}
119+
#define WALUpdate4(Type, A1, A2, A3, A4) \
120+
{ \
121+
beforeWrapper4(Type, A1, A2, A3, A4), afterWrapper4(Type, A1, A2, A3, A4), \
122+
entrySize4(Type, A1, A2, A3, A4) \
123+
}

source/storage/buffer-manager/BufferManager.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -448,5 +448,22 @@ BufferManager::~BufferManager() {
448448
munmap(mBfs, totalMemSize);
449449
}
450450

451+
void BufferManager::DoWithBufferFrameIf(
452+
std::function<bool(BufferFrame& bf)> condition,
453+
std::function<void(BufferFrame& bf)> action) {
454+
utils::Parallelize::parallelRange(mNumBfs, [&](u64 begin, u64 end) {
455+
DCHECK(condition != nullptr);
456+
DCHECK(action != nullptr);
457+
for (u64 i = begin; i < end; i++) {
458+
auto& bf = mBfs[i];
459+
bf.header.mLatch.mutex.lock();
460+
if (condition(bf)) {
461+
action(bf);
462+
}
463+
bf.header.mLatch.mutex.unlock();
464+
}
465+
});
466+
}
467+
451468
} // namespace storage
452469
} // namespace leanstore

source/storage/buffer-manager/BufferManager.hpp

+28-29
Original file line numberDiff line numberDiff line change
@@ -46,44 +46,32 @@ class BufferManager {
4646
friend class leanstore::profiling::BMTable;
4747

4848
public:
49+
/// All the managed buffer frames in the memory.
4950
BufferFrame* mBfs;
5051

52+
/// FD for disk files storing pages.
5153
const int mPageFd;
5254

53-
// Free Pages. We reserve these extra pages to prevent segfaults
55+
/// Free Pages, reserved to to prevent segmentfaults.
5456
const u8 mNumSaftyBfs = 10;
5557

5658
// total number of dram buffer frames
5759
u64 mNumBfs;
5860

59-
// For cooling and inflight io
61+
/// For cooling and inflight io
6062
u64 mNumPartitions;
6163
u64 mPartitionsMask;
6264
std::vector<std::unique_ptr<Partition>> mPartitions;
65+
66+
/// All the buffer frame provider threads.
6367
std::vector<std::unique_ptr<BufferFrameProvider>> mBfProviders;
6468

6569
public:
66-
//---------------------------------------------------------------------------
67-
// Constructor and Destructors
68-
//---------------------------------------------------------------------------
6970
BufferManager(s32 pageFd);
70-
~BufferManager();
71-
72-
public:
73-
//---------------------------------------------------------------------------
74-
// Static fields
75-
//---------------------------------------------------------------------------
76-
77-
// Global buffer manager singleton instance, to be initialized.
78-
static std::unique_ptr<BufferManager> sInstance;
7971

80-
// Temporary hack: let workers evict the last page they used
81-
static thread_local BufferFrame* sTlsLastReadBf;
72+
~BufferManager();
8273

8374
public:
84-
//---------------------------------------------------------------------------
85-
// Object utils
86-
//---------------------------------------------------------------------------
8775
u64 getPartitionID(PID);
8876

8977
Partition& randomPartition();
@@ -92,20 +80,21 @@ class BufferManager {
9280

9381
BufferFrame& randomBufferFrame();
9482

95-
/**
96-
* Get a buffer frame from a random partition for a new page.
97-
*
98-
* NOTE: The buffer frame is initialized with an unused page ID, and is
99-
* exclusively locked.
100-
*/
83+
/// Get a buffer frame from a random partition for new page.
84+
///
85+
/// NOTE: The buffer frame is initialized with an unused page ID, and is
86+
/// exclusively locked.
10187
BufferFrame& AllocNewPage();
10288

103-
/// @brief Resolves the buffer frame pointed by the swipValue.
89+
/// Resolves the buffer frame pointed by the swipValue.
90+
///
10491
/// @param swipGuard The latch guard on the owner of the swip. Usually a swip
10592
/// is owned by a btree node, and the node should be latched before resolve
10693
/// the swips of child nodes.
94+
///
10795
/// @param swipValue The swip value from which to resolve the buffer frame.
10896
/// Usually a swip represents a btree node.
97+
///
10998
/// @return The buffer frame regarding to the swip.
11099
inline BufferFrame* tryFastResolveSwip(Guard& swipGuard,
111100
Swip<BufferFrame>& swipValue) {
@@ -126,9 +115,8 @@ class BufferManager {
126115
/// Reads the page at pageId to the destination buffer. All the pages are
127116
/// stored in one file (mPageFd), page id (pageId) determines the offset of
128117
/// the pageId-th page in the underlying file:
129-
///
130-
/// offset of pageId-th page: pageId * PAGE_SIZE
131-
/// size of each page: PAGE_SIZE
118+
/// 1. offset of pageId-th page: pageId * PAGE_SIZE
119+
/// 2. size of each page: PAGE_SIZE
132120
void readPageSync(PID pageId, void* destination);
133121

134122
void fDataSync();
@@ -150,6 +138,17 @@ class BufferManager {
150138
void deserialize(StringMap map);
151139

152140
u64 consumedPages();
141+
142+
/// Do something on all the buffer frames which satisify the condition
143+
void DoWithBufferFrameIf(std::function<bool(BufferFrame& bf)> condition,
144+
std::function<void(BufferFrame& bf)> action);
145+
146+
public:
147+
/// Global buffer manager singleton instance, lazily initialized.
148+
static std::unique_ptr<BufferManager> sInstance;
149+
150+
/// Temporary hack: let workers evict the last page they used
151+
static thread_local BufferFrame* sTlsLastReadBf;
153152
};
154153

155154
} // namespace storage

0 commit comments

Comments
 (0)