Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions src/observer/virtual_table/ob_all_virtual_memstore_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,20 +268,12 @@ int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row)
cur_row_.cells_[i].set_int(mt->get_occupied_size());
break;
case OB_APP_MIN_COLUMN_ID + 10:
// hash_item_count
if (nullptr != data_memtable) {
cur_row_.cells_[i].set_int(data_memtable->get_hash_item_count());
} else {
cur_row_.cells_[i].set_int(0);
}
// hash_item_count (removed, always 0)
cur_row_.cells_[i].set_int(0);
break;
case OB_APP_MIN_COLUMN_ID + 11:
// hash_mem_used
if (nullptr != data_memtable) {
cur_row_.cells_[i].set_int(data_memtable->get_hash_alloc_memory());
} else {
cur_row_.cells_[i].set_int(0);
}
// hash_mem_used (removed, always 0)
cur_row_.cells_[i].set_int(0);
break;
case OB_APP_MIN_COLUMN_ID + 12:
// btree_item_count
Expand Down
136 changes: 135 additions & 1 deletion src/storage/memtable/mvcc/ob_keybtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ int BaseHandle<BtreeKey, BtreeVal>::acquire_ref()
}

template<typename BtreeKey, typename BtreeVal>
int GetHandle<BtreeKey, BtreeVal>::get(BtreeNode *root, BtreeKey key, BtreeVal &val)
int GetHandle<BtreeKey, BtreeVal>::get(BtreeNode *root, BtreeKey key, BtreeVal &val, BtreeKey **copy_inner_key)
{
int ret = OB_SUCCESS;
BtreeNode *leaf = nullptr;
Expand Down Expand Up @@ -269,6 +269,9 @@ int GetHandle<BtreeKey, BtreeVal>::get(BtreeNode *root, BtreeKey key, BtreeVal &
index->load(leaf->get_index());
}
val = leaf->get_val(pos, index);
if (OB_NOT_NULL(copy_inner_key)) {
*copy_inner_key = &leaf->get_key(pos, index);
}
} else {
ret = OB_ENTRY_NOT_EXIST;
}
Expand Down Expand Up @@ -603,6 +606,78 @@ int WriteHandle<BtreeKey, BtreeVal>::insert_and_split_upward(BtreeKey key, Btree
return ret;
}

template<typename BtreeKey, typename BtreeVal>
int WriteHandle<BtreeKey, BtreeVal>::insert_or_get_and_split_upward(
BtreeKey key, const BtreeKvCreator &creator, BtreeVal &val, BtreeNode *&new_root)
{
int ret = OB_SUCCESS;
int pos = -1;
bool is_found = false;
BtreeNode *old_node = nullptr;
BtreeNode *new_node_1 = nullptr;
BtreeNode *new_node_2 = nullptr;
MultibitSet *index = &this->index_;
BtreeKey new_insert_key;
UNUSED(this->path_.pop(old_node, pos)); // pop may failed, old_node is allowd to be NULL

if (OB_ISNULL(old_node)) {
if (OB_ISNULL(new_node_1 = alloc_node())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(creator(false/*is_exist_key*/, new_insert_key, val))) {
OB_LOG(WARN, "fail to create value", K(key));
} else {
new_node_1->insert_into_node(0, new_insert_key, val);
}
} else if (this->path_.get_is_found()) {
is_found = true;
val = old_node->get_val(pos, index);
BtreeKey &exist_key = old_node->get_key(pos, index);
if (OB_FAIL(creator(true/*is_exist_key*/, exist_key, val))) {
OB_LOG(WARN, "fail to set exist key", K(key), K(exist_key));
}
} else {
if (OB_FAIL(creator(false/*is_exist_key*/, new_insert_key, val))) {
OB_LOG(WARN, "fail to create value", K(key));
} else {
ret = insert_into_node(old_node, pos, new_insert_key, val, new_node_1, new_node_2);
}
}
while (OB_SUCCESS == ret && OB_NOT_NULL(new_node_1)) {
if (OB_ISNULL(new_node_2)) {
if (OB_SUCCESS != this->path_.pop(old_node, pos)) {
new_root = new_node_1;
new_node_1 = nullptr;
} else if (pos < 0) {
ret = replace_child_and_key(old_node, 0, new_node_1->get_key(0, index), new_node_1, new_node_1);
} else {
ret = replace_child(old_node, pos, (BtreeVal)new_node_1);
new_node_1 = nullptr;
}
} else {
if (OB_SUCCESS != this->path_.pop(old_node, pos)) {
ret = this->make_new_root(new_root,
new_node_1->get_key(0, index),
new_node_1,
new_node_2->get_key(0),
new_node_2,
(int16_t)(new_node_1->get_level() + 1));
new_node_1 = nullptr;
new_node_2 = nullptr;
} else {
ret = split_child(old_node,
std::max(0, pos),
new_node_1->get_key(0, index),
(BtreeVal)new_node_1,
new_node_2->get_key(0, index),
(BtreeVal)new_node_2,
new_node_1,
new_node_2);
}
}
}
return ret;
}

template<typename BtreeKey, typename BtreeVal>
int WriteHandle<BtreeKey, BtreeVal>::insert_into_node(BtreeNode *old_node, int pos, BtreeKey key, BtreeVal val, BtreeNode *&new_node_1, BtreeNode *&new_node_2)
{
Expand Down Expand Up @@ -1524,6 +1599,47 @@ int ObKeyBtree<BtreeKey, BtreeVal>::insert(const BtreeKey key, BtreeVal &value)
return ret;
}

template<typename BtreeKey, typename BtreeVal>
int ObKeyBtree<BtreeKey, BtreeVal>::insert_or_get(const BtreeKey key,
const BtreeKvCreator &creator,
BtreeVal &val)
{
int ret = OB_SUCCESS;
BtreeNode *old_root = nullptr;
BtreeNode *new_root = nullptr;
WriteHandle handle(*this);
handle.get_is_in_delete() = false;
if (OB_FAIL(handle.acquire_ref())) {
OB_LOG(ERROR, "acquire_ref fail", K(ret));
} else {
ret = OB_EAGAIN;
}
while (OB_EAGAIN == ret) {
if (OB_FAIL(handle.find_path(old_root = ATOMIC_LOAD(&root_), key))) {
OB_LOG(ERROR, "path.search error", K(root_), K(ret));
} else if (OB_FAIL(handle.insert_or_get_and_split_upward(key, creator, val, new_root = old_root))) {
// do nothing
} else if (old_root != new_root) {
if (!ATOMIC_BCAS(&root_, old_root, new_root)) {
ret = OB_EAGAIN;
}
}
if (OB_EAGAIN == ret) {
handle.free_list();
}
}
handle.release_ref();
handle.retire(ret);
if (OB_SUCC(ret)) {
size_.inc(1);
} else if (OB_ALLOCATE_MEMORY_FAILED == ret) {
OB_LOG(WARN, "btree.insert_or_get(key) error", KR(ret), K(key), K(val));
} else {
OB_LOG(ERROR, "btree.insert_or_get(key) error", KR(ret), K(key), K(val));
}
return ret;
}

template<typename BtreeKey, typename BtreeVal>
int ObKeyBtree<BtreeKey, BtreeVal>::get(const BtreeKey key, BtreeVal &value)
{
Expand All @@ -1539,6 +1655,24 @@ int ObKeyBtree<BtreeKey, BtreeVal>::get(const BtreeKey key, BtreeVal &value)
return ret;
}

template<typename BtreeKey, typename BtreeVal>
int ObKeyBtree<BtreeKey, BtreeVal>::get(const BtreeKey key, BtreeVal &value, BtreeKey &copy_inner_key)
{
int ret = OB_SUCCESS;
BtreeKey *inner_key_ptr = nullptr;
GetHandle handle(*this);
if (OB_FAIL(handle.acquire_ref())) {
OB_LOG(ERROR, "acquire_ref fail", K(ret));
} else if (OB_FAIL(handle.get(ATOMIC_LOAD(&root_), key, value, &inner_key_ptr))) {
if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) {
OB_LOG(ERROR, "btree.get(key) fail", KR(ret), K(key), K(value));
}
} else if (OB_NOT_NULL(inner_key_ptr)) {
copy_inner_key = *inner_key_ptr;
}
return ret;
}

template<typename BtreeKey, typename BtreeVal>
int ObKeyBtree<BtreeKey, BtreeVal>::set_key_range(BtreeIterator &iter, const BtreeKey min_key, const bool start_exclude,
const BtreeKey max_key, const bool end_exclude) const
Expand Down
3 changes: 3 additions & 0 deletions src/storage/memtable/mvcc/ob_keybtree.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,11 @@ class ObKeyBtree
static int batch_destroy();

// ===================== Ob Btree Operator =====================
typedef typename WriteHandle::BtreeKvCreator BtreeKvCreator;
int insert(const BtreeKey key, BtreeVal &value);
int insert_or_get(const BtreeKey key, const BtreeKvCreator &creator, BtreeVal &val);
int get(const BtreeKey key, BtreeVal &value);
int get(const BtreeKey key, BtreeVal &value, BtreeKey &copy_inner_key);
int set_key_range(BtreeIterator &iter, const BtreeKey min_key, const bool start_exclude,
const BtreeKey max_key, const bool end_exclude) const;
int set_key_range(BtreeRawIterator &handle, const BtreeKey min_key, const bool start_exclude,
Expand Down
5 changes: 4 additions & 1 deletion src/storage/memtable/mvcc/ob_keybtree_deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "lib/ob_abort.h"
#include "lib/allocator/ob_retire_station.h"
#include "lib/function/ob_function.h"

#define BTREE_ASSERT(x) if (OB_UNLIKELY(!(x))) { ob_abort(); }

Expand Down Expand Up @@ -440,7 +441,7 @@ class GetHandle: public BaseHandle<BtreeKey, BtreeVal>
public:
GetHandle(ObKeyBtree &tree): BaseHandle(tree.get_qclock()) { UNUSED(tree); }
~GetHandle() {}
int get(BtreeNode *root, BtreeKey key, BtreeVal &val);
int get(BtreeNode *root, BtreeKey key, BtreeVal &val, BtreeKey **copy_inner_key = nullptr);
};

template<typename BtreeKey, typename BtreeVal>
Expand Down Expand Up @@ -561,6 +562,8 @@ class WriteHandle: public BaseHandle<BtreeKey, BtreeVal>
}
public:
int insert_and_split_upward(BtreeKey key, BtreeVal &val, BtreeNode *&new_root);
typedef ObFunction<int(const bool is_exist_key, BtreeKey &key, BtreeVal &val)> BtreeKvCreator;
int insert_or_get_and_split_upward(BtreeKey key, const BtreeKvCreator &creator, BtreeVal &val, BtreeNode *&new_root);
private:
int insert_into_node(BtreeNode *old_node, int pos, BtreeKey key, BtreeVal val, BtreeNode *&new_node_1, BtreeNode *&new_node_2);
// judge whether it's wrlocked
Expand Down
120 changes: 43 additions & 77 deletions src/storage/memtable/mvcc/ob_mvcc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,6 @@ int ObMvccEngine::check_row_locked(ObMvccAccessCtx &ctx,

int ObMvccEngine::create_kvs(const ObMemtableSetArg &memtable_set_arg,
ObMemtableKeyGenerator &memtable_key_generator,
// whether is normal insert and we can
// optimize to alloc first in the case
const bool is_normal_insert,
ObStoredKVs &kvs)
{
int ret = OB_SUCCESS;
Expand All @@ -270,11 +267,10 @@ int ObMvccEngine::create_kvs(const ObMemtableSetArg &memtable_set_arg,
for (int64_t i = 0; OB_SUCC(ret) && i < row_count; i++) {
if (OB_FAIL(memtable_key_generator.generate_memtable_key(new_rows[i]))) {
TRANS_LOG(WARN, "generate memtable key fail", K(ret), K(memtable_set_arg));
} else if (OB_FAIL(create_kv(&memtable_key_generator.get_memtable_key(),
is_normal_insert,
&kvs.at(i).key_,
kvs.at(i).value_))) {
TRANS_LOG(WARN, "create kv fail", K(ret), K(memtable_set_arg));
} else if (OB_FAIL(create_btree_kv_(&memtable_key_generator.get_memtable_key(),
&kvs.at(i).key_,
kvs.at(i).value_))) {
TRANS_LOG(WARN, "create btree kv fail", K(ret), K(memtable_set_arg));
} else if (nullptr != memtable_key_buffer &&
OB_FAIL(memtable_key_buffer->push_back(kvs[i].key_))) {
TRANS_LOG(WARN, "push back stored memtable key into buffer failed", K(ret));
Expand All @@ -285,62 +281,10 @@ int ObMvccEngine::create_kvs(const ObMemtableSetArg &memtable_set_arg,
}

int ObMvccEngine::create_kv(const ObMemtableKey *key,
const bool is_insert,
ObMemtableKey *stored_key,
ObMvccRow *&value)
{
int64_t loop_cnt = 0;
int ret = OB_SUCCESS;
value = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "mvcc_engine not init", K(this));
} else {

while (OB_SUCCESS == ret && NULL == value) {
ObStoreRowkey *tmp_key = nullptr;
// We optimize the create_kv operation by skipping the first hash table
// get for insert operation because it is unnecessary at most cases. Under
// the concurrent inserts, we rely on the conflict on the hash table set
// and the while loops for the next hash table get to maintain the origin
// create_kv semantic
if (!(0 == loop_cnt // is the first try in the loop
&& is_insert) // is insert dml operation
&& OB_SUCC(query_engine_->get(key, value, stored_key))) {
if (NULL == value) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "get NULL value");
}
} else if (OB_FAIL(kv_builder_->dup_key(tmp_key,
*engine_allocator_,
key->get_rowkey()))) {
TRANS_LOG(WARN, "key dup fail", K(ret));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(stored_key->encode(tmp_key))) {
TRANS_LOG(WARN, "key encode fail", K(ret));
} else if (NULL == (value = (ObMvccRow *)engine_allocator_->alloc(sizeof(*value)))) {
TRANS_LOG(WARN, "alloc ObMvccRow fail");
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
value = new(value) ObMvccRow();
if (OB_SUCCESS == (ret = query_engine_->set(stored_key, value))) {
} else if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
value = NULL;
} else {
value = NULL;
}
}
loop_cnt++;
}
if (loop_cnt > 2) {
if (REACH_TIME_INTERVAL(10 * 1000 * 1000) || 3 == loop_cnt) {
TRANS_LOG(ERROR, "unexpected loop cnt when preparing kv", K(ret), K(loop_cnt), K(*key), K(*stored_key));
}
}
}

return ret;
return create_btree_kv_(key, stored_key, value);
}

int ObMvccEngine::mvcc_write(storage::ObStoreCtx &ctx,
Expand Down Expand Up @@ -503,27 +447,49 @@ void ObMvccEngine::finish_kvs(ObMvccWriteResults& results)
}
}

int ObMvccEngine::ensure_kv(const ObMemtableKey *stored_key,
ObMvccRow *value)
void ObMvccEngine::mvcc_undo(ObMvccRow *value)
{
int ret = OB_SUCCESS;
value->mvcc_undo();
}

if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "mvcc_engine not init", K(this));
} else {
ObRowLatchGuard guard(value->latch_);
if (OB_FAIL(query_engine_->ensure(stored_key,
value))) {
TRANS_LOG(WARN, "ensure_row fail", K(ret));
int ObMvccEngine::create_btree_kv_(const ObMemtableKey *key,
ObMemtableKey *stored_key,
ObMvccRow *&value)
{
int ret = OB_SUCCESS;
ObQueryEngine::ObMvccRowCreator row_creator = [this, key, stored_key](const bool is_exist_key,
ObStoreRowkeyWrapper &new_or_exist_key,
ObMvccRow *&new_row) {
int ret = OB_SUCCESS;
if (is_exist_key) {
stored_key->encode(new_or_exist_key.get_rowkey());
} else {
ObStoreRowkey *tmp_key = nullptr;
if (OB_FAIL(kv_builder_->dup_key(tmp_key, *engine_allocator_, key->get_rowkey()))) {
TRANS_LOG(WARN, "key dup fail", K(ret));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_ISNULL(new_row = (ObMvccRow *)engine_allocator_->alloc(sizeof(*new_row)))) {
TRANS_LOG(WARN, "alloc ObMvccRow fail", K(ret));
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
stored_key->encode(tmp_key);
new_or_exist_key = ObStoreRowkeyWrapper(tmp_key);
new_row = new(new_row) ObMvccRow();
new_row->set_btree_indexed();
}
}
return ret;
};

value = nullptr;
if (OB_FAIL(query_engine_->create_btree_kv(key, row_creator, value))) {
TRANS_LOG(WARN, "create btree kv fail", K(ret), KPC(key));
} else if (OB_ISNULL(value)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "get NULL value", K(ret), KPC(key));
}
return ret;
}

void ObMvccEngine::mvcc_undo(ObMvccRow *value)
{
value->mvcc_undo();
return ret;
}
}
}
Loading
Loading