From 5c5441193e82e42e4587f5b9bc8f432bb7841dc5 Mon Sep 17 00:00:00 2001 From: cao1629 Date: Mon, 23 Mar 2026 16:34:51 +0800 Subject: [PATCH] Remove hashtable dependency from memtable Use btree as the sole index in ObQueryEngine. Add insert_or_get() to ObKeyBtree for atomic insert-or-get semantics. Fold hashtable-based create_kv/ensure_kv into btree-only path. Point lookups now go directly through ObQueryEngine::get() (btree lookup). --- .../ob_all_virtual_memstore_info.cpp | 16 +- src/storage/memtable/mvcc/ob_keybtree.cpp | 136 +++- src/storage/memtable/mvcc/ob_keybtree.h | 3 + src/storage/memtable/mvcc/ob_keybtree_deps.h | 5 +- src/storage/memtable/mvcc/ob_mvcc_engine.cpp | 120 +-- src/storage/memtable/mvcc/ob_mvcc_engine.h | 19 +- src/storage/memtable/mvcc/ob_query_engine.cpp | 112 +-- src/storage/memtable/mvcc/ob_query_engine.h | 34 +- src/storage/memtable/ob_memtable.cpp | 40 +- src/storage/memtable/ob_memtable.h | 2 - src/storage/memtable/ob_mt_hash.h | 735 ------------------ .../storage/memtable/repeat_test_mt_hash.sh | 20 - unittest/storage/memtable/test_mt_hash.cpp | 262 ------- 13 files changed, 223 insertions(+), 1281 deletions(-) delete mode 100644 src/storage/memtable/ob_mt_hash.h delete mode 100755 unittest/storage/memtable/repeat_test_mt_hash.sh delete mode 100644 unittest/storage/memtable/test_mt_hash.cpp diff --git a/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp b/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp index f491db3fd..0733cd7ef 100644 --- a/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp +++ b/src/observer/virtual_table/ob_all_virtual_memstore_info.cpp @@ -268,20 +268,12 @@ int ObAllVirtualMemstoreInfo::process_curr_tenant(ObNewRow *&row) cur_row_.cells_[i].set_int(mt->get_occupied_size()); break; case OB_APP_MIN_COLUMN_ID + 10: - // hash_item_count - if (nullptr != data_memtable) { - cur_row_.cells_[i].set_int(data_memtable->get_hash_item_count()); - } else { - cur_row_.cells_[i].set_int(0); - } + // hash_item_count (removed, always 0) + cur_row_.cells_[i].set_int(0); break; case OB_APP_MIN_COLUMN_ID + 11: - // hash_mem_used - if (nullptr != data_memtable) { - cur_row_.cells_[i].set_int(data_memtable->get_hash_alloc_memory()); - } else { - cur_row_.cells_[i].set_int(0); - } + // hash_mem_used (removed, always 0) + cur_row_.cells_[i].set_int(0); break; case OB_APP_MIN_COLUMN_ID + 12: // btree_item_count diff --git a/src/storage/memtable/mvcc/ob_keybtree.cpp b/src/storage/memtable/mvcc/ob_keybtree.cpp index b969f4a47..c8fff90ff 100644 --- a/src/storage/memtable/mvcc/ob_keybtree.cpp +++ b/src/storage/memtable/mvcc/ob_keybtree.cpp @@ -237,7 +237,7 @@ int BaseHandle::acquire_ref() } template -int GetHandle::get(BtreeNode *root, BtreeKey key, BtreeVal &val) +int GetHandle::get(BtreeNode *root, BtreeKey key, BtreeVal &val, BtreeKey **copy_inner_key) { int ret = OB_SUCCESS; BtreeNode *leaf = nullptr; @@ -269,6 +269,9 @@ int GetHandle::get(BtreeNode *root, BtreeKey key, BtreeVal & index->load(leaf->get_index()); } val = leaf->get_val(pos, index); + if (OB_NOT_NULL(copy_inner_key)) { + *copy_inner_key = &leaf->get_key(pos, index); + } } else { ret = OB_ENTRY_NOT_EXIST; } @@ -603,6 +606,78 @@ int WriteHandle::insert_and_split_upward(BtreeKey key, Btree return ret; } +template +int WriteHandle::insert_or_get_and_split_upward( + BtreeKey key, const BtreeKvCreator &creator, BtreeVal &val, BtreeNode *&new_root) +{ + int ret = OB_SUCCESS; + int pos = -1; + bool is_found = false; + BtreeNode *old_node = nullptr; + BtreeNode *new_node_1 = nullptr; + BtreeNode *new_node_2 = nullptr; + MultibitSet *index = &this->index_; + BtreeKey new_insert_key; + UNUSED(this->path_.pop(old_node, pos)); // pop may failed, old_node is allowd to be NULL + + if (OB_ISNULL(old_node)) { + if (OB_ISNULL(new_node_1 = alloc_node())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else if (OB_FAIL(creator(false/*is_exist_key*/, new_insert_key, val))) { + OB_LOG(WARN, "fail to create value", K(key)); + } else { + new_node_1->insert_into_node(0, new_insert_key, val); + } + } else if (this->path_.get_is_found()) { + is_found = true; + val = old_node->get_val(pos, index); + BtreeKey &exist_key = old_node->get_key(pos, index); + if (OB_FAIL(creator(true/*is_exist_key*/, exist_key, val))) { + OB_LOG(WARN, "fail to set exist key", K(key), K(exist_key)); + } + } else { + if (OB_FAIL(creator(false/*is_exist_key*/, new_insert_key, val))) { + OB_LOG(WARN, "fail to create value", K(key)); + } else { + ret = insert_into_node(old_node, pos, new_insert_key, val, new_node_1, new_node_2); + } + } + while (OB_SUCCESS == ret && OB_NOT_NULL(new_node_1)) { + if (OB_ISNULL(new_node_2)) { + if (OB_SUCCESS != this->path_.pop(old_node, pos)) { + new_root = new_node_1; + new_node_1 = nullptr; + } else if (pos < 0) { + ret = replace_child_and_key(old_node, 0, new_node_1->get_key(0, index), new_node_1, new_node_1); + } else { + ret = replace_child(old_node, pos, (BtreeVal)new_node_1); + new_node_1 = nullptr; + } + } else { + if (OB_SUCCESS != this->path_.pop(old_node, pos)) { + ret = this->make_new_root(new_root, + new_node_1->get_key(0, index), + new_node_1, + new_node_2->get_key(0), + new_node_2, + (int16_t)(new_node_1->get_level() + 1)); + new_node_1 = nullptr; + new_node_2 = nullptr; + } else { + ret = split_child(old_node, + std::max(0, pos), + new_node_1->get_key(0, index), + (BtreeVal)new_node_1, + new_node_2->get_key(0, index), + (BtreeVal)new_node_2, + new_node_1, + new_node_2); + } + } + } + return ret; +} + template int WriteHandle::insert_into_node(BtreeNode *old_node, int pos, BtreeKey key, BtreeVal val, BtreeNode *&new_node_1, BtreeNode *&new_node_2) { @@ -1524,6 +1599,47 @@ int ObKeyBtree::insert(const BtreeKey key, BtreeVal &value) return ret; } +template +int ObKeyBtree::insert_or_get(const BtreeKey key, + const BtreeKvCreator &creator, + BtreeVal &val) +{ + int ret = OB_SUCCESS; + BtreeNode *old_root = nullptr; + BtreeNode *new_root = nullptr; + WriteHandle handle(*this); + handle.get_is_in_delete() = false; + if (OB_FAIL(handle.acquire_ref())) { + OB_LOG(ERROR, "acquire_ref fail", K(ret)); + } else { + ret = OB_EAGAIN; + } + while (OB_EAGAIN == ret) { + if (OB_FAIL(handle.find_path(old_root = ATOMIC_LOAD(&root_), key))) { + OB_LOG(ERROR, "path.search error", K(root_), K(ret)); + } else if (OB_FAIL(handle.insert_or_get_and_split_upward(key, creator, val, new_root = old_root))) { + // do nothing + } else if (old_root != new_root) { + if (!ATOMIC_BCAS(&root_, old_root, new_root)) { + ret = OB_EAGAIN; + } + } + if (OB_EAGAIN == ret) { + handle.free_list(); + } + } + handle.release_ref(); + handle.retire(ret); + if (OB_SUCC(ret)) { + size_.inc(1); + } else if (OB_ALLOCATE_MEMORY_FAILED == ret) { + OB_LOG(WARN, "btree.insert_or_get(key) error", KR(ret), K(key), K(val)); + } else { + OB_LOG(ERROR, "btree.insert_or_get(key) error", KR(ret), K(key), K(val)); + } + return ret; +} + template int ObKeyBtree::get(const BtreeKey key, BtreeVal &value) { @@ -1539,6 +1655,24 @@ int ObKeyBtree::get(const BtreeKey key, BtreeVal &value) return ret; } +template +int ObKeyBtree::get(const BtreeKey key, BtreeVal &value, BtreeKey ©_inner_key) +{ + int ret = OB_SUCCESS; + BtreeKey *inner_key_ptr = nullptr; + GetHandle handle(*this); + if (OB_FAIL(handle.acquire_ref())) { + OB_LOG(ERROR, "acquire_ref fail", K(ret)); + } else if (OB_FAIL(handle.get(ATOMIC_LOAD(&root_), key, value, &inner_key_ptr))) { + if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { + OB_LOG(ERROR, "btree.get(key) fail", KR(ret), K(key), K(value)); + } + } else if (OB_NOT_NULL(inner_key_ptr)) { + copy_inner_key = *inner_key_ptr; + } + return ret; +} + template int ObKeyBtree::set_key_range(BtreeIterator &iter, const BtreeKey min_key, const bool start_exclude, const BtreeKey max_key, const bool end_exclude) const diff --git a/src/storage/memtable/mvcc/ob_keybtree.h b/src/storage/memtable/mvcc/ob_keybtree.h index b3ce3f3e0..c57343387 100644 --- a/src/storage/memtable/mvcc/ob_keybtree.h +++ b/src/storage/memtable/mvcc/ob_keybtree.h @@ -279,8 +279,11 @@ class ObKeyBtree static int batch_destroy(); // ===================== Ob Btree Operator ===================== + typedef typename WriteHandle::BtreeKvCreator BtreeKvCreator; int insert(const BtreeKey key, BtreeVal &value); + int insert_or_get(const BtreeKey key, const BtreeKvCreator &creator, BtreeVal &val); int get(const BtreeKey key, BtreeVal &value); + int get(const BtreeKey key, BtreeVal &value, BtreeKey ©_inner_key); int set_key_range(BtreeIterator &iter, const BtreeKey min_key, const bool start_exclude, const BtreeKey max_key, const bool end_exclude) const; int set_key_range(BtreeRawIterator &handle, const BtreeKey min_key, const bool start_exclude, diff --git a/src/storage/memtable/mvcc/ob_keybtree_deps.h b/src/storage/memtable/mvcc/ob_keybtree_deps.h index c58f621b2..dc08b6af1 100644 --- a/src/storage/memtable/mvcc/ob_keybtree_deps.h +++ b/src/storage/memtable/mvcc/ob_keybtree_deps.h @@ -19,6 +19,7 @@ #include "lib/ob_abort.h" #include "lib/allocator/ob_retire_station.h" +#include "lib/function/ob_function.h" #define BTREE_ASSERT(x) if (OB_UNLIKELY(!(x))) { ob_abort(); } @@ -440,7 +441,7 @@ class GetHandle: public BaseHandle public: GetHandle(ObKeyBtree &tree): BaseHandle(tree.get_qclock()) { UNUSED(tree); } ~GetHandle() {} - int get(BtreeNode *root, BtreeKey key, BtreeVal &val); + int get(BtreeNode *root, BtreeKey key, BtreeVal &val, BtreeKey **copy_inner_key = nullptr); }; template @@ -561,6 +562,8 @@ class WriteHandle: public BaseHandle } public: int insert_and_split_upward(BtreeKey key, BtreeVal &val, BtreeNode *&new_root); + typedef ObFunction BtreeKvCreator; + int insert_or_get_and_split_upward(BtreeKey key, const BtreeKvCreator &creator, BtreeVal &val, BtreeNode *&new_root); private: int insert_into_node(BtreeNode *old_node, int pos, BtreeKey key, BtreeVal val, BtreeNode *&new_node_1, BtreeNode *&new_node_2); // judge whether it's wrlocked diff --git a/src/storage/memtable/mvcc/ob_mvcc_engine.cpp b/src/storage/memtable/mvcc/ob_mvcc_engine.cpp index c4302dd7f..a50a80389 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_engine.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_engine.cpp @@ -256,9 +256,6 @@ int ObMvccEngine::check_row_locked(ObMvccAccessCtx &ctx, int ObMvccEngine::create_kvs(const ObMemtableSetArg &memtable_set_arg, ObMemtableKeyGenerator &memtable_key_generator, - // whether is normal insert and we can - // optimize to alloc first in the case - const bool is_normal_insert, ObStoredKVs &kvs) { int ret = OB_SUCCESS; @@ -270,11 +267,10 @@ int ObMvccEngine::create_kvs(const ObMemtableSetArg &memtable_set_arg, for (int64_t i = 0; OB_SUCC(ret) && i < row_count; i++) { if (OB_FAIL(memtable_key_generator.generate_memtable_key(new_rows[i]))) { TRANS_LOG(WARN, "generate memtable key fail", K(ret), K(memtable_set_arg)); - } else if (OB_FAIL(create_kv(&memtable_key_generator.get_memtable_key(), - is_normal_insert, - &kvs.at(i).key_, - kvs.at(i).value_))) { - TRANS_LOG(WARN, "create kv fail", K(ret), K(memtable_set_arg)); + } else if (OB_FAIL(create_btree_kv_(&memtable_key_generator.get_memtable_key(), + &kvs.at(i).key_, + kvs.at(i).value_))) { + TRANS_LOG(WARN, "create btree kv fail", K(ret), K(memtable_set_arg)); } else if (nullptr != memtable_key_buffer && OB_FAIL(memtable_key_buffer->push_back(kvs[i].key_))) { TRANS_LOG(WARN, "push back stored memtable key into buffer failed", K(ret)); @@ -285,62 +281,10 @@ int ObMvccEngine::create_kvs(const ObMemtableSetArg &memtable_set_arg, } int ObMvccEngine::create_kv(const ObMemtableKey *key, - const bool is_insert, ObMemtableKey *stored_key, ObMvccRow *&value) { - int64_t loop_cnt = 0; - int ret = OB_SUCCESS; - value = nullptr; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - TRANS_LOG(WARN, "mvcc_engine not init", K(this)); - } else { - - while (OB_SUCCESS == ret && NULL == value) { - ObStoreRowkey *tmp_key = nullptr; - // We optimize the create_kv operation by skipping the first hash table - // get for insert operation because it is unnecessary at most cases. Under - // the concurrent inserts, we rely on the conflict on the hash table set - // and the while loops for the next hash table get to maintain the origin - // create_kv semantic - if (!(0 == loop_cnt // is the first try in the loop - && is_insert) // is insert dml operation - && OB_SUCC(query_engine_->get(key, value, stored_key))) { - if (NULL == value) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "get NULL value"); - } - } else if (OB_FAIL(kv_builder_->dup_key(tmp_key, - *engine_allocator_, - key->get_rowkey()))) { - TRANS_LOG(WARN, "key dup fail", K(ret)); - ret = OB_ALLOCATE_MEMORY_FAILED; - } else if (OB_FAIL(stored_key->encode(tmp_key))) { - TRANS_LOG(WARN, "key encode fail", K(ret)); - } else if (NULL == (value = (ObMvccRow *)engine_allocator_->alloc(sizeof(*value)))) { - TRANS_LOG(WARN, "alloc ObMvccRow fail"); - ret = OB_ALLOCATE_MEMORY_FAILED; - } else { - value = new(value) ObMvccRow(); - if (OB_SUCCESS == (ret = query_engine_->set(stored_key, value))) { - } else if (OB_ENTRY_EXIST == ret) { - ret = OB_SUCCESS; - value = NULL; - } else { - value = NULL; - } - } - loop_cnt++; - } - if (loop_cnt > 2) { - if (REACH_TIME_INTERVAL(10 * 1000 * 1000) || 3 == loop_cnt) { - TRANS_LOG(ERROR, "unexpected loop cnt when preparing kv", K(ret), K(loop_cnt), K(*key), K(*stored_key)); - } - } - } - - return ret; + return create_btree_kv_(key, stored_key, value); } int ObMvccEngine::mvcc_write(storage::ObStoreCtx &ctx, @@ -503,27 +447,49 @@ void ObMvccEngine::finish_kvs(ObMvccWriteResults& results) } } -int ObMvccEngine::ensure_kv(const ObMemtableKey *stored_key, - ObMvccRow *value) +void ObMvccEngine::mvcc_undo(ObMvccRow *value) { - int ret = OB_SUCCESS; + value->mvcc_undo(); +} - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - TRANS_LOG(WARN, "mvcc_engine not init", K(this)); - } else { - ObRowLatchGuard guard(value->latch_); - if (OB_FAIL(query_engine_->ensure(stored_key, - value))) { - TRANS_LOG(WARN, "ensure_row fail", K(ret)); +int ObMvccEngine::create_btree_kv_(const ObMemtableKey *key, + ObMemtableKey *stored_key, + ObMvccRow *&value) +{ + int ret = OB_SUCCESS; + ObQueryEngine::ObMvccRowCreator row_creator = [this, key, stored_key](const bool is_exist_key, + ObStoreRowkeyWrapper &new_or_exist_key, + ObMvccRow *&new_row) { + int ret = OB_SUCCESS; + if (is_exist_key) { + stored_key->encode(new_or_exist_key.get_rowkey()); + } else { + ObStoreRowkey *tmp_key = nullptr; + if (OB_FAIL(kv_builder_->dup_key(tmp_key, *engine_allocator_, key->get_rowkey()))) { + TRANS_LOG(WARN, "key dup fail", K(ret)); + ret = OB_ALLOCATE_MEMORY_FAILED; + } else if (OB_ISNULL(new_row = (ObMvccRow *)engine_allocator_->alloc(sizeof(*new_row)))) { + TRANS_LOG(WARN, "alloc ObMvccRow fail", K(ret)); + ret = OB_ALLOCATE_MEMORY_FAILED; + } else { + stored_key->encode(tmp_key); + new_or_exist_key = ObStoreRowkeyWrapper(tmp_key); + new_row = new(new_row) ObMvccRow(); + new_row->set_btree_indexed(); + } } + return ret; + }; + + value = nullptr; + if (OB_FAIL(query_engine_->create_btree_kv(key, row_creator, value))) { + TRANS_LOG(WARN, "create btree kv fail", K(ret), KPC(key)); + } else if (OB_ISNULL(value)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "get NULL value", K(ret), KPC(key)); } - return ret; -} -void ObMvccEngine::mvcc_undo(ObMvccRow *value) -{ - value->mvcc_undo(); + return ret; } } } diff --git a/src/storage/memtable/mvcc/ob_mvcc_engine.h b/src/storage/memtable/mvcc/ob_mvcc_engine.h index fac8fa40a..c5281187a 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_engine.h +++ b/src/storage/memtable/mvcc/ob_mvcc_engine.h @@ -69,16 +69,12 @@ class ObMvccEngine // Return the ObMvccRow according to the memtable key or create the new one if // the memtable key is not exist. int create_kv(const ObMemtableKey *key, - const bool is_insert, ObMemtableKey *stored_key, ObMvccRow *&value); // Return the ObStoreRowkey and ObMvccRow pair according to the memtable key // or create all unexisted ones if some of the memtable key are not exist. int create_kvs(const ObMemtableSetArg &memtable_set_arg, ObMemtableKeyGenerator &memtable_key_generator, - // whether is normal insert and we can - // optimize to alloc first in the case - const bool is_normal_insert, ObStoredKVs &kvs); // mvcc_write builds the ObMvccTransNode according to the arg and write @@ -107,11 +103,6 @@ class ObMvccEngine int mvcc_replay(const ObTxNodeArg &arg, ObMvccReplayResult &res); - // ensure_kv is used to make sure b-tree is no longer broken by the deleted - // row. - int ensure_kv(const ObMemtableKey *stored_key, - ObMvccRow *value); - // finish_kv is used to make tx_node visible to outer read void finish_kv(ObMvccWriteResult& res); void finish_kvs(ObMvccWriteResults& results); @@ -153,12 +144,10 @@ class ObMvccEngine int init_tx_node_(const ObTxNodeArg &arg, ObMvccTransNode *node); - int batch_alloc_kv_and_set_(const int64_t count, - const int64_t key_data_size, - const ObMemtableKeyGenerator &keys, - ObStoredKVs &kvs, - int64_t &finished_cnt); -private: + int create_btree_kv_(const ObMemtableKey *key, + ObMemtableKey *stored_key, + ObMvccRow *&value); + DISALLOW_COPY_AND_ASSIGN(ObMvccEngine); bool is_inited_; ObMTKVBuilder *kv_builder_; diff --git a/src/storage/memtable/mvcc/ob_query_engine.cpp b/src/storage/memtable/mvcc/ob_query_engine.cpp index bd10a4688..756b147fd 100644 --- a/src/storage/memtable/mvcc/ob_query_engine.cpp +++ b/src/storage/memtable/mvcc/ob_query_engine.cpp @@ -112,42 +112,6 @@ void ObQueryEngine::dump2text(FILE* fd) } } -int ObQueryEngine::dump_keyhash(FILE *fd) const -{ - int ret = OB_SUCCESS; - const bool print_bucket_node = true; - const bool print_row_value = false; // basic info of ObMvccRow - const bool print_row_value_verbose = false; // data part of ObMvccRow - - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - TRANS_LOG(WARN, "not init", "this", this); - } else if (OB_ISNULL(fd)) { - ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid param", KP(fd)); - } else { - keyhash_.dump_hash(fd, - print_bucket_node, - print_row_value, - print_row_value_verbose); - } - - return ret; -} - - -int64_t ObQueryEngine::hash_size() const -{ - int64_t arr_size = keyhash_.get_arr_size(); - return arr_size; -} - -int64_t ObQueryEngine::hash_alloc_memory() const -{ - int64_t alloc_mem = keyhash_.get_alloc_memory(); - return alloc_mem; -} - int64_t ObQueryEngine::btree_size() const { int64_t obj_cnt = keybtree_.size(); @@ -182,7 +146,6 @@ void ObQueryEngine::destroy() { keybtree_.destroy(true /*is_batch_destroy*/); btree_allocator_.reset(); - keyhash_.destroy(); is_inited_ = false; } @@ -191,39 +154,9 @@ void ObQueryEngine::pre_batch_destroy_keybtree() (void)keybtree_.pre_batch_destroy(); } -// The hashmap is thread safe and only one of concurrency inserts will succeed. -// While others will get the OB_ENTRY_EXIST error code. -int ObQueryEngine::set(const ObMemtableKey *key, ObMvccRow *value) -{ - int ret = OB_SUCCESS; - - if (IS_NOT_INIT) { - TRANS_LOG(WARN, "not init", KP(this)); - ret = OB_NOT_INIT; - } else if (OB_ISNULL(key) || OB_ISNULL(value)) { - ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid param when query_engine set", - KR(ret), KP(key), KP(value)); - } else { - ObStoreRowkeyWrapper key_wrapper(key->get_rowkey()); - if (OB_FAIL(keyhash_.insert(&key_wrapper, value))) { - if (OB_ENTRY_EXIST != ret) { - TRANS_LOG(WARN, "put to keyhash fail", K(ret), - KPC(key), KPC(value)); - } - } else { - value->set_hash_indexed(); - } - } - if (OB_FAIL(ret) && OB_ENTRY_EXIST != ret) { - TRANS_LOG(WARN, "query engine set fail", KR(ret), K(ret), KPC(key), KPC(value)); - } - return ret; -} - int ObQueryEngine::get(const ObMemtableKey *parameter_key, - ObMvccRow *&row, - ObMemtableKey *returned_key) + ObMvccRow *&row, + ObMemtableKey *returned_key) { int ret = OB_SUCCESS; row = nullptr; @@ -236,50 +169,39 @@ int ObQueryEngine::get(const ObMemtableKey *parameter_key, TRANS_LOG(WARN, "invalid param", KP(parameter_key)); } else { const ObStoreRowkeyWrapper parameter_key_wrapper(parameter_key->get_rowkey()); - const ObStoreRowkeyWrapper *copy_inner_key_wrapper = nullptr; - if (OB_FAIL(keyhash_.get(¶meter_key_wrapper, row, copy_inner_key_wrapper))) { + if (OB_FAIL(keybtree_.get(parameter_key_wrapper, row))) { if (OB_ENTRY_NOT_EXIST != ret) { - TRANS_LOG(WARN, "get from keyhash fail", KR(ret), KPC(parameter_key)); + TRANS_LOG(WARN, "get from keybtree fail", KR(ret), KPC(parameter_key)); } row = nullptr; } else if (OB_ISNULL(row)) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "get NULL value from keyhash", KR(ret), KPC(parameter_key)); - } else if (returned_key) { - ret = returned_key->encode(copy_inner_key_wrapper->get_rowkey()); + TRANS_LOG(ERROR, "get NULL value from keybtree", KR(ret), KPC(parameter_key)); } } - return ret; } -// The caller need to guarantee the mutual exclusive enforced here, otherwise -// the concurrent modification will violate the rules of the btree(the ERROR -// will be reported if two same key is inserted into keybtree successively) -int ObQueryEngine::ensure(const ObMemtableKey *key, ObMvccRow *value) +int ObQueryEngine::create_btree_kv(const ObMemtableKey *parameter_key, + const ObMvccRowCreator &row_creator, + ObMvccRow *&row) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { - TRANS_LOG(WARN, "not init", "this", this); + TRANS_LOG(WARN, "not init", K(this)); ret = OB_NOT_INIT; - } else if (OB_ISNULL(key) || OB_ISNULL(value)) { + } else if (OB_ISNULL(parameter_key)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid param", KP(parameter_key)); + } else if (OB_UNLIKELY(!row_creator.is_valid())) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "query_engine ensure error, invalid param", KR(ret), KP(key), KP(value)); + TRANS_LOG(WARN, "invalid row creator", K(row_creator)); } else { - if (!value->is_btree_indexed()) { - ObStoreRowkeyWrapper key_wrapper(key->get_rowkey()); - if (OB_FAIL(keybtree_.insert(key_wrapper, value))) { - TRANS_LOG(WARN, "ensure keybtree fail", KR(ret), K(*key)); - if (OB_ENTRY_EXIST == ret) { - TRANS_LOG(ERROR, "ensure keybtree fail", KR(ret), K(*key)); - } else { - TRANS_LOG(WARN, "ensure keybtree fail", KR(ret), K(*key)); - } - } else { - value->set_btree_indexed(); - } + const ObStoreRowkeyWrapper parameter_key_wrapper(parameter_key->get_rowkey()); + if (OB_FAIL(keybtree_.insert_or_get(parameter_key_wrapper, row_creator, row))) { + TRANS_LOG(WARN, "fail to insert or get", KR(ret), KPC(parameter_key)); } } diff --git a/src/storage/memtable/mvcc/ob_query_engine.h b/src/storage/memtable/mvcc/ob_query_engine.h index 47857dfc9..e218b7204 100644 --- a/src/storage/memtable/mvcc/ob_query_engine.h +++ b/src/storage/memtable/mvcc/ob_query_engine.h @@ -18,11 +18,11 @@ #define OCEANBASE_MEMTABLE_MVCC_OB_QUERY_ENGINE_ #include "lib/container/ob_iarray.h" +#include "lib/function/ob_function.h" #include "lib/oblog/ob_log_module.h" #include "lib/objectpool/ob_concurrency_objpool.h" #include "storage/memtable/mvcc/ob_keybtree.h" #include "storage/memtable/ob_memtable_key.h" -#include "storage/memtable/ob_mt_hash.h" namespace oceanbase { @@ -61,9 +61,7 @@ class ObIQueryEngineIterator virtual void reset() = 0; }; -// ObQueryEngine consists of hashtable and btree. We will maintain key and value -// into both of the hashtable and btree and use them to complete efficient -// point select and range query operations +// ObQueryEngine uses a btree for both point lookups and range queries class ObQueryEngine { public: @@ -75,8 +73,8 @@ class ObQueryEngine typedef keybtree::BtreeIterator BtreeIterator; // Used only for estimation. typedef keybtree::BtreeRawIterator BtreeRawIterator; - // hashtable for point select - typedef ObMtHash KeyHash; + // callback for creating btree kv + typedef ObFunction ObMvccRowCreator; // ObQueryEngine Iterator implements the iterator interface template @@ -155,24 +153,17 @@ class ObQueryEngine : is_inited_(false), memstore_allocator_(memstore_allocator), btree_allocator_(memstore_allocator_), - keybtree_(btree_allocator_), - keyhash_(memstore_allocator_) {} + keybtree_(btree_allocator_) {} ~ObQueryEngine() { destroy(); } int init(); void destroy(); void pre_batch_destroy_keybtree(); // ===================== Ob Query Engine User Operation Interface ===================== - // The concurrency control alogorithm of query engine is as following steps: - // 1. Firstly, we use the atomic hashtable to ensure that only one thread can - // create the ObMvccRow and support efficient point select(through set()) - // 2. Then we can operate the ObMvccRow according to the operation semantics - // 3. After above operation, we need atomically insert the ObMvccRow into the - // btree to support the efficient range query(through ensure()) - int set(const ObMemtableKey *key, ObMvccRow *value); - int ensure(const ObMemtableKey *key, ObMvccRow *value); - // get() will use the hashtable to support fast point select + // get() looks up directly in the btree int get(const ObMemtableKey *parameter_key, ObMvccRow *&row, ObMemtableKey *returned_key); + // create_btree_kv() atomically inserts or gets from btree using a callback + int create_btree_kv(const ObMemtableKey *parameter_key, const ObMvccRowCreator &row_creator, ObMvccRow *&row); // scan() will use the btree to support fast range query int scan(const ObMemtableKey *start_key, const bool start_exclude, const ObMemtableKey *end_key, const bool end_exclude, ObIQueryEngineIterator *&ret_iter); @@ -202,12 +193,7 @@ class ObQueryEngine void check_cleanout(bool &is_all_cleanout, bool &is_all_delay_cleanout, int64_t &count); - // Dump the hash table and btree to the file. void dump2text(FILE *fd); - int dump_keyhash(FILE *fd) const; - // Btree statistics used for virtual table - int64_t hash_size() const; - int64_t hash_alloc_memory() const; int64_t btree_size() const; int64_t btree_alloc_memory() const; private: @@ -246,13 +232,11 @@ class ObQueryEngine DISALLOW_COPY_AND_ASSIGN(ObQueryEngine); bool is_inited_; - // allocator for keyhash and btree + // allocator for btree ObIAllocator &memstore_allocator_; BtreeNodeAllocator btree_allocator_; // The btree optimized for fast range scan KeyBtree keybtree_; - // The hashtable optimized for fast point select - KeyHash keyhash_; // Iterator allocator for read and estimation IteratorAlloc iter_alloc_; IteratorAlloc raw_iter_alloc_; diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 625af45f7..eb90c52d2 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1535,16 +1535,6 @@ int ObMemtable::row_compact(ObMvccRow *row, return ret; } -int64_t ObMemtable::get_hash_item_count() const -{ - return query_engine_.hash_size(); -} - -int64_t ObMemtable::get_hash_alloc_memory() const -{ - return query_engine_.hash_alloc_memory(); -} - int64_t ObMemtable::get_btree_item_count() const { return query_engine_.btree_size(); @@ -2131,8 +2121,6 @@ int ObMemtable::dump2text(const char *fname) TRANS_LOG(WARN, "convert key fail", K_(key), K(ret)); } else { fprintf(fd, "memtable: key=%s\n", key_ptr); - fprintf(fd, "hash_item_count=%ld, hash_alloc_size=%ld\n", - get_hash_item_count(), get_hash_alloc_memory()); fprintf(fd, "btree_item_count=%ld, btree_alloc_size=%ld\n", get_btree_item_count(), get_btree_alloc_memory()); query_engine_.dump2text(fd); @@ -2675,7 +2663,6 @@ int ObMemtable::mvcc_replay_(storage::ObStoreCtx &ctx, common::ObTimeGuard timeguard("ObMemtable::mvcc_replay_", 5 * 1000); if (OB_FAIL(mvcc_engine_.create_kv(key, - false, // is_insert &stored_key, value))) { TRANS_LOG(WARN, "prepare kv before lock fail", K(ret)); @@ -2683,9 +2670,6 @@ int ObMemtable::mvcc_replay_(storage::ObStoreCtx &ctx, } else if (OB_FAIL(mvcc_engine_.mvcc_replay(arg, res))) { TRANS_LOG(WARN, "mvcc replay fail", K(ret)); } else if (FALSE_IT(timeguard.click("mvcc_engine_.mvcc_replay"))) { - } else if (OB_FAIL(mvcc_engine_.ensure_kv(&stored_key, value))) { - TRANS_LOG(WARN, "prepare kv after lock fail", K(ret)); - } else if (FALSE_IT(timeguard.click("mvcc_engine_.ensure_kv"))) { } else if (OB_FAIL(mem_ctx->register_row_replay_cb(&stored_key, value, res.tx_node_, @@ -2733,9 +2717,6 @@ int ObMemtable::batch_mvcc_write_(const storage::ObTableIterParam ¶m, TRANS_LOG(WARN, "reserce kvs failed", K(ret)); } else if (OB_FAIL(mvcc_engine_.create_kvs(memtable_set_arg, memtable_key_generator, - // is_normal_insert - blocksstable::ObDmlFlag::DF_INSERT == writer_dml_flag - && !rows_info.need_find_all_duplicate_key(), stored_kvs))) { TRANS_LOG(WARN, "create kv failed", K(ret), K(tx_node_args)); } @@ -2832,13 +2813,7 @@ int ObMemtable::batch_mvcc_write_(const storage::ObTableIterParam ¶m, mvcc_results[i]); } } - // Step3: insert the stored key(pay attention to the life cycle of the - // stored key itself) and the mvcc row into the b+tree to support a better - // scan performance - } else if (OB_FAIL(mvcc_engine_.ensure_kv(&stored_kvs[i].key_, - stored_kvs[i].value_))) { - TRANS_LOG(WARN, "prepare kv after lock fail", K(ret)); - // Step4: remember the stored key for later callback registration(pay + // Step3: remember the stored key for later callback registration(pay // attention to the life cycle between the stored key and local allocated // memtable key) and value for later the follow-ups of mvcc-write } else { @@ -2877,11 +2852,9 @@ int ObMemtable::mvcc_write_(ObStoreCtx &ctx, ObMemtableCtx *mem_ctx = ctx.mvcc_acc_ctx_.get_mem_ctx(); transaction::ObTxSnapshot &snapshot = ctx.mvcc_acc_ctx_.snapshot_; - // Step1: create or get the memtable key and mvcc row from the hash table - // which ensuring the unqiueness of the key and value + // Step1: create or get the memtable key and mvcc row from the btree + // which ensuring the uniqueness of the key and value if (OB_FAIL(mvcc_engine_.create_kv(&memtable_key, - // is_insert - blocksstable::ObDmlFlag::DF_INSERT == tx_node_arg.data_->dml_flag_, &stored_key, value))) { TRANS_LOG(WARN, "create kv failed", K(ret), K(tx_node_arg), K(memtable_key)); @@ -2914,12 +2887,7 @@ int ObMemtable::mvcc_write_(ObStoreCtx &ctx, } else { TRANS_LOG(WARN, "mvcc write fail", K(ret)); } - // Step3: insert the stored key(pay attention to the life cycle of the - // stored key itself) and the mvcc row into the b+tree to support a better - // scan performance - } else if (OB_FAIL(mvcc_engine_.ensure_kv(&stored_key, value))) { - TRANS_LOG(WARN, "prepare kv after lock fail", K(ret)); - // Step4: remember the stored key for later callback registration(pay + // Step3: remember the stored key for later callback registration(pay // attention to the life cycle between the stored key and local allocated // memtable key) and value for later the follow-ups of mvcc-write } else { diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index 0c709071f..de5acc815 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -366,8 +366,6 @@ struct UpdateMergeInfoForMemtable int row_compact(ObMvccRow *value, const share::SCN snapshot_version, const int64_t flag); - int64_t get_hash_item_count() const; - int64_t get_hash_alloc_memory() const; int64_t get_btree_item_count() const; int64_t get_btree_alloc_memory() const; diff --git a/src/storage/memtable/ob_mt_hash.h b/src/storage/memtable/ob_mt_hash.h deleted file mode 100644 index 71184b788..000000000 --- a/src/storage/memtable/ob_mt_hash.h +++ /dev/null @@ -1,735 +0,0 @@ -/* - * Copyright (c) 2025 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef OCEANBASE_STRORAGE_MEMTABLE_OB_MT_HASH_ -#define OCEANBASE_STRORAGE_MEMTABLE_OB_MT_HASH_ - -#include "lib/allocator/ob_allocator.h" -//#include "lib/hash/ob_hash_common.h" -#include "storage/memtable/ob_memtable_key.h" -#include "storage/memtable/mvcc/ob_mvcc_row.h" // for dump row verbose - -namespace oceanbase -{ -namespace memtable -{ -typedef ObStoreRowkeyWrapper Key; -class ObMvccRow; - -// ---------------- common function ---------------- -// common functions should be packed into library directory, temporarily placed here - -// bit flip B63-B62-...-B1-B0 => B0-B1-...-B62-B63 -OB_INLINE uint64_t bitrev(uint64_t x) -{ - x = (((x & 0xaaaaaaaaaaaaaaaaUL) >> 1) | ((x & 0x5555555555555555UL) << 1)); - x = (((x & 0xccccccccccccccccUL) >> 2) | ((x & 0x3333333333333333UL) << 2)); - x = (((x & 0xf0f0f0f0f0f0f0f0UL) >> 4) | ((x & 0x0f0f0f0f0f0f0f0fUL) << 4)); - x = (((x & 0xff00ff00ff00ff00UL) >> 8) | ((x & 0x00ff00ff00ff00ffUL) << 8)); - x = (((x & 0xffff0000ffff0000UL) >> 16) | ((x & 0x0000ffff0000ffff) << 16)); - return ((x >> 32) | (x << 32)); -} - -// ---------------- helper functions ---------------- -// mark the lowest 2 bits -OB_INLINE static uint64_t mark_hash(const uint64_t key_hash) -{ - return (key_hash | 3ULL); -} - -OB_INLINE int64_t back2idx(const int64_t bucket_hash) -{ - return bitrev(bucket_hash & (~0x3)); -} - -// ---------------- node definition ---------------- -// the type of the array element is ObHashNode, -// the hash_ value is the bitrev of the array index -// the lowest 2 bits of hash_ represents is_bucket and is_filled respectively -struct ObHashNode -{ - ObHashNode *next_; - uint64_t hash_; - - ObHashNode() : next_(NULL), hash_(0) {} - ~ObHashNode() {} - - OB_INLINE void set_arr_idx(const int64_t idx) - { - ATOMIC_STORE(&hash_, (bitrev(idx))); - } - - // the lowest 2 bits of the node's hash_ value are special flags: - // 1. the lowest bit marks node type, 0 represents bucket_node and 1 represents mt_node - // 2. the second lowest is used to control visibility, 0 means invisible and 1 means visible - OB_INLINE void set_bucket_filled(const int64_t idx) - { - // idx is the array index of the node, and idx < arr_size << int64_max, so the lowest bit of hash_ is always 0 - // idx and node are in one-to-one respondence(such respondence is the same throughtout all threads) - // so overwrites hash_ directly regardless of the result of set_arr_idx - ATOMIC_STORE(&hash_, (bitrev(idx) | 2ULL)); - } - OB_INLINE bool is_bucket_filled() const - { - return (ATOMIC_LOAD(&hash_) & 2ULL); - } - OB_INLINE bool is_bucket_node() const - { - return 0 == (ATOMIC_LOAD(&hash_) & 1ULL); - } -}; - -// stores ObMemtableKey object instead of pointers -// long term consideration: -// stores pointer along with btree -// allocate once for ObMemtableKey object itself and it's obj array(objs_[0]) -// saves memory usage of hash/btree node(need one indirect reference for row_key comparison any way) -struct ObMtHashNode : public ObHashNode -{ - Key key_; - ObMvccRow *value_; - - ObMtHashNode() : key_(), value_(NULL) {} - ObMtHashNode(const Key &mtk) : key_(mtk), value_(NULL) - { - // the tail of data node is always 1 - hash_ = mark_hash(mtk.hash()); - } - ObMtHashNode(const Key &mtk, const ObMvccRow *value) - : key_(mtk), - value_(const_cast(value)) - { - hash_ = mark_hash(mtk.hash()); - } - ~ObMtHashNode() { value_ = NULL; } -}; - -// caller gurantees non-empty parameters -OB_INLINE static int compare_node(const ObHashNode *n1, const ObHashNode *n2, int &cmp) -{ - int ret = common::OB_SUCCESS; - cmp = 0; - if (OB_LIKELY(n1->hash_ > n2->hash_)) { - cmp = 1; - } else if (n1->hash_ < n2->hash_) { - cmp = -1; - } else if (n1->is_bucket_node()) { - // do nothing - } else { - bool is_equal = false; - const Key &mtk1 = (static_cast(n1))->key_; - const Key &mtk2 = (static_cast(n2))->key_; - if (OB_FAIL(mtk1.equal(mtk2, is_equal))) { - TRANS_LOG(ERROR, "failed to compare", KR(ret), K(mtk1), K(mtk2)); - } else if (is_equal) { - cmp = 0; - } else if (OB_FAIL(mtk1.compare(mtk2, cmp))) { - TRANS_LOG(ERROR, "failed to compare", KR(ret), K(mtk1), K(mtk2)); - } else { - // do nothing - } - } - return ret; -} - -// ---------------- array implementation ---------------- -template -class ObMtArrayBase -{ -private: - static const int64_t DIR_SIZE = PAGE_SIZE / sizeof(ObHashNode*); - static const int64_t SEG_SIZE = PAGE_SIZE / sizeof(ObHashNode); - static ObHashNode * const PLACE_HOLDER; -public: - static const int64_t ARRAY_CAPABILITY = DIR_SIZE * SEG_SIZE; -public: - explicit ObMtArrayBase(common::ObIAllocator &allocator) - : allocator_(allocator), - dir_(nullptr), - alloc_memory_(0) - { - } - ~ObMtArrayBase() { destroy(); } - void destroy() { - dir_ = nullptr; - alloc_memory_ = 0; - } - int64_t get_alloc_memory() const { return ATOMIC_LOAD(&alloc_memory_) + sizeof(*this); } - // 1. caller guraantees the validity of idx - // 2. allocate dir_/seg on demand - // 3. return the node pointer of the corresponding position of ret_node - OB_INLINE int at(const int64_t idx, ObHashNode *&ret_node) - { - int ret = common::OB_SUCCESS; - ObHashNode **dir = NULL; - ObHashNode *seg = NULL; - if (OB_FAIL(load_dir(dir))) { - } else if (OB_FAIL(load_seg(dir, idx / SEG_SIZE, seg))) { - } else { - ret_node = seg + (idx % SEG_SIZE); - } - return ret; - } - int64_t to_string(char *buf, int64_t buf_len) const - { - return snprintf(buf, buf_len, ", dir_=%p", dir_); - } -private: - // return values: - // OB_ALLOCATE_MEMORY_FAILED : no memory - // OB_ENTRY_NOT_EXIST : current node is allocating - // common::OB_SUCCESS : ret_dir is definitely valid - OB_INLINE int load_dir(ObHashNode **&ret_dir) - { - int ret = common::OB_SUCCESS; - ret_dir = ATOMIC_LOAD(&dir_); - if (OB_NOT_NULL(ret_dir) && OB_UNLIKELY(reinterpret_cast(PLACE_HOLDER) != ret_dir)) { - // good, do nothing - } else if (NULL == ret_dir) { - if (ATOMIC_BCAS(&dir_, NULL, reinterpret_cast(PLACE_HOLDER))) { - if (OB_NOT_NULL(ret_dir = reinterpret_cast(allocator_.alloc(PAGE_SIZE)))) { - ATOMIC_FAA(&alloc_memory_, PAGE_SIZE); - memset(ret_dir, 0, PAGE_SIZE); - if (OB_LIKELY(ATOMIC_BCAS(&dir_, reinterpret_cast(PLACE_HOLDER), ret_dir))) { - } else { - // place_holder is a lock - ret = common::OB_ERR_UNEXPECTED; - } - } else { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - } - } else { - ret = common::OB_ENTRY_NOT_EXIST; - } - } else { - ret = common::OB_ENTRY_NOT_EXIST; - } - return ret; - } - - // return value: - // OB_ALLOCATE_MEMORY_FAILED : no memory - // OB_ENTRY_NOT_EXIST : current node is allocating - // common::OB_SUCCESS : ret_dir is definitely valid - OB_INLINE int load_seg(ObHashNode **dir, const int64_t seg_idx, ObHashNode *&ret_seg) - { - int ret = common::OB_SUCCESS; - ret_seg = ATOMIC_LOAD(dir + seg_idx); - if (OB_NOT_NULL(ret_seg) && OB_LIKELY(PLACE_HOLDER != ret_seg)) { - // good, do nothing - } else if (NULL == ret_seg) { - if (ATOMIC_BCAS(dir + seg_idx, NULL, PLACE_HOLDER)) { - if (OB_NOT_NULL(ret_seg = reinterpret_cast(allocator_.alloc(PAGE_SIZE)))) { - ATOMIC_FAA(&alloc_memory_, PAGE_SIZE); - memset(ret_seg, 0, PAGE_SIZE); // make sure all nodes are invalid - if (OB_LIKELY(ATOMIC_BCAS(dir + seg_idx, PLACE_HOLDER, ret_seg))) { - } else { - ret = common::OB_ERR_UNEXPECTED; - } - } else { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - } - } else { - ret = common::OB_ENTRY_NOT_EXIST; - } - } else { - ret = common::OB_ENTRY_NOT_EXIST; - } - return ret; - } -private: - common::ObIAllocator &allocator_; - ObHashNode** dir_; - int64_t alloc_memory_; -}; - -template -ObHashNode * const ObMtArrayBase::PLACE_HOLDER = (ObHashNode *)0x1; - -class ObMtArray -{ -public: - explicit ObMtArray(common::ObIAllocator &allocator) - : small_arr_(allocator), - large_arr_(allocator) - { - } - ~ObMtArray() { destroy(); } - void destroy() - { - small_arr_.destroy(); - large_arr_.destroy(); - } - int64_t get_alloc_memory() const { return small_arr_.get_alloc_memory() + large_arr_.get_alloc_memory();} - // caller ensures idx is valid - OB_INLINE int at(const int64_t idx, ObHashNode *&ret_node) - { - int ret = common::OB_SUCCESS; - if (idx < SMALL_CAPABILITY) { - ret = small_arr_.at(idx, ret_node); - } else { - ret = large_arr_.at(idx, ret_node); - } - return ret; - } - int64_t to_string(char *buf, int64_t buf_len) const - { - int64_t len = small_arr_.to_string(buf, buf_len); - len += large_arr_.to_string(buf + len, buf_len - len); - return len; - } -private: - // TODO memstore_allocator misc - static const int64_t MY_NORMAL_BLOCK_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE; - static const int64_t MY_BIG_BLOCK_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE - 64; -public: - // SMALL_CAPABILITY 520,000, TOTAL_CAPABILITY 3.43 billion - static const int64_t SMALL_CAPABILITY = ObMtArrayBase::ARRAY_CAPABILITY; - static const int64_t TOTAL_CAPABILITY = ObMtArrayBase::ARRAY_CAPABILITY + SMALL_CAPABILITY; -private: - ObMtArrayBase small_arr_; - ObMtArrayBase large_arr_; -}; - -// ---------------- hash implementation ---------------- -// don't use generic: QueryEngine uses template, and instantiates with type ObMemtableKey, -// uses the type directly here -// consider to remove generic from QueryEgnine in the future -class ObMtHash -{ -private: - // bucket link of parent-child relation - static const int64_t GENEALOGY_LEN = 64; - struct Parent - { - Parent(): bucket_node_(nullptr), bucket_idx_(0) {} - ObHashNode *bucket_node_; - int64_t bucket_idx_; - TO_STRING_KV(KP(bucket_node_), K(bucket_idx_)); - }; - struct Genealogy - { - struct Parent list_[GENEALOGY_LEN]; - int64_t depth_; - - Genealogy() : depth_(0) {} - OB_INLINE void append_parent(ObHashNode *node, int64_t idx) - { - // depth_ is not more than 64 - if (OB_UNLIKELY(depth_ >= GENEALOGY_LEN)) { - TRANS_LOG_RET(ERROR, common::OB_ERR_UNEXPECTED, "unexpect, genealogy is too deep", K(depth_)); - } else { - list_[depth_].bucket_node_ = node; - list_[depth_].bucket_idx_ = idx; - } - ++depth_; - } - OB_INLINE ObHashNode* get_young() - { - return 0 == depth_ ? NULL : list_[0].bucket_node_; - } - TO_STRING_KV(K(depth_), K(list_)); - }; - -public: - explicit ObMtHash(common::ObIAllocator &allocator) - : allocator_(allocator), - arr_(allocator), - arr_size_(INIT_HASH_SIZE) - { - // tail_node_ is a sentinel node, there will be no problem - // even the data node has a hash_ value 0xFF..FF - tail_node_.hash_ = 0xFFFFFFFFFFFFFFFF; // can be any value - tail_node_.next_ = NULL; - zero_node_.set_bucket_filled(0); - zero_node_.next_ = &tail_node_; - } - ~ObMtHash() { destroy(); } - void destroy() - { - arr_.destroy(); - arr_size_ = INIT_HASH_SIZE; - tail_node_.hash_ = 0xFFFFFFFFFFFFFFFF; // can be any value - tail_node_.next_ = NULL; - zero_node_.set_bucket_filled(0); - zero_node_.next_ = &tail_node_; - } - int64_t get_arr_size() const { return ATOMIC_LOAD(&arr_size_); } - int64_t get_alloc_memory() const { return sizeof(*this) + arr_.get_alloc_memory() + get_arr_size() * sizeof(ObMtHashNode) - sizeof(arr_);} - // keep the interface unchanged, so as to not modify caller's code - int get(const Key *query_key, - ObMvccRow *&ret_value, - const Key *©_inner_key) - { - // This is for history database, which has plenty of data but merely modifies them, - // so as to avoid occupying unnecessary memory by empty memtable, otherwise - // looking up in hash index of a memtable would - // trigger allocating dir/seg(each memtabale is aat least 16KB) - int ret = common::OB_ENTRY_NOT_EXIST; - if (!is_empty()) { - ret = do_get(query_key, ret_value, copy_inner_key); - } - return ret; - } - - int get(const Key *query_key, ObMvccRow *&ret_value) - { - int ret = common::OB_ENTRY_NOT_EXIST; - const Key *trival_copy_inner_key = NULL; - if (!is_empty()) { - ret = do_get(query_key, ret_value, trival_copy_inner_key); - } - return ret; - } - - int insert(const Key *insert_key, const ObMvccRow *insert_value) - { - int ret = common::OB_SUCCESS; - const uint64_t insert_key_hash = mark_hash(insert_key->hash()); - const uint64_t insert_key_so_hash = bitrev(insert_key_hash); - ObHashNode *bucket_node = NULL; - Genealogy genealogy; - const int64_t arr_size = ATOMIC_LOAD(&arr_size_); - if (OB_FAIL(get_bucket_node(arr_size, insert_key_so_hash, bucket_node, genealogy))) { - // no memory, do nothing - } else { - ObHashNode *op_bucket_node = fill_bucket(bucket_node, genealogy); - if (OB_FAIL(insert_mt_node(insert_key, insert_key_hash, insert_value, op_bucket_node))) { - if (common::OB_ENTRY_EXIST != ret && common::OB_ALLOCATE_MEMORY_FAILED == ret) { - TRANS_LOG(WARN, "insert mt_node error", K(ret), K(insert_key), KP(insert_value)); - } - } - TRANS_LOG(TRACE, "insert", K(genealogy), KP(bucket_node), K(op_bucket_node)); - } - return ret; - } - - void dump_hash(FILE* fd, - const bool print_bucket, - const bool print_row_value, - const bool print_row_value_verbose) const - { - dump_meta_info(fd); - dump_list(fd, print_bucket, print_row_value, print_row_value_verbose); - } - -private: - OB_INLINE bool is_empty() const - { - return ATOMIC_LOAD(&(zero_node_.next_)) == &tail_node_; - } - OB_INLINE static int64_t get_arr_idx(const int64_t query_key_so_hash, - const int64_t bucket_count) - { - return (query_key_so_hash & (bucket_count - 1)); - } - - int get_bucket_node(const int64_t arr_size, - const uint64_t query_key_so_hash, - ObHashNode *&bucket_node, - Genealogy &genealogy) - { - int ret = common::OB_SUCCESS; - int64_t bucket_count = common::next_pow2(arr_size); - int64_t arr_idx = bucket_count; - // if the insert position is very forward and arr_size is big, there is no - // need to access the same element repeatedly when recursively looking up, just decrease by half. - // Note: bucket_count can decrease more rapidly, but as long as not access array elements, - // it's accesptabale to loop multiple times(< 64) to decreasse by half. - int64_t last_arr_idx = arr_idx; - - while (OB_SUCC(ret) && arr_idx > 0) { - // find parent bucket recursively, until find a filled bucket or reach zero node(always filled) - arr_idx = get_arr_idx(query_key_so_hash, bucket_count); - if (OB_UNLIKELY(0 == arr_idx)) { - bucket_node = &zero_node_; - } else if (arr_idx == last_arr_idx || arr_idx >= arr_size) { - bucket_count >>= 1; - } else { - last_arr_idx = arr_idx; - ret = arr_.at(arr_idx, bucket_node); - if (OB_SUCC(ret)) { - if (OB_UNLIKELY(!bucket_node->is_bucket_filled())) { - bucket_count >>= 1; - genealogy.append_parent(bucket_node, arr_idx); - } else { - break; - } - } else if (common::OB_ENTRY_NOT_EXIST == ret) { - // current node is allocating, take it as invisible, and go on looking up parent bucket_node - bucket_count >>= 1; - ret = common::OB_SUCCESS; - } else { - // no memory - } - } - } // end while - // TRANS_LOG(TRACE, "get_bucket_node", K(arr_size), K(bucket_count), K(query_key_so_hash), K(arr_idx), K(bucket_node), K(genealogy)); - return ret; - } - - OB_INLINE bool not_reach_list_tail(const ObHashNode *next) - { - return &tail_node_ != next; - } - - // returns the comparison result between target node and - // the first node which is greater than or equal to target node - OB_INLINE int search_sub_range_list(ObHashNode *bucket_node, - ObHashNode *target_node, - ObHashNode *&prev_node, - ObHashNode *&next_node, - int &cmp) - { - int ret = common::OB_SUCCESS; - cmp = 1; - prev_node = bucket_node; - // prev < target < next - // find the first node >= target_node or reaches the end of link list - while (not_reach_list_tail(next_node = ATOMIC_LOAD(&(prev_node->next_))) - && (OB_SUCC(compare_node(target_node, next_node, cmp))) - && cmp > 0) { - prev_node = next_node; - } - return ret; - } - - // the time and process of filling bucket - // each get/insert operation first looks up recursively until finding an already filled bucket. - // during the process, record the unfilled bucket by it's recursive order, and fill them serializably - // from the smallest idx. when filling bucket whith multiple threads, current thread need to wait until - // other threads has completed filling buckets. after filling, do insert/get from the last bucket - // Note: filling bucket is a lazy operation, if there is no insert/get operation - // within a sub range, bucket filling will not be triggered. - // fill_bucket returns the last bucket used for operation. - OB_INLINE ObHashNode* fill_bucket(ObHashNode *parent_bucket_node, Genealogy &genealogy) - { - ObHashNode *parent = parent_bucket_node; - ObHashNode *child = NULL; - for (int64_t pos = genealogy.depth_ - 1; pos >= 0; pos--) { - child = genealogy.list_[pos].bucket_node_; - fill_pair(parent, child, genealogy.list_[pos].bucket_idx_); - parent = child; - } - ObHashNode *op_bucket_node = genealogy.get_young(); - return ((NULL != op_bucket_node) ? op_bucket_node: parent_bucket_node); - } - - // repeat until success. if multiple threads fill the same bucket, - // only one thread is allowed to fill, and other threads will wait until success - void fill_pair(ObHashNode *parent_bucket_node, ObHashNode* child_bucket_node, int64_t child_bucket_idx) - { - if (ATOMIC_BCAS(&(child_bucket_node->next_), NULL, reinterpret_cast(0x02))) { - // one thread is responsible for filling the bucket - ObMtHashNode target_node; - target_node.set_arr_idx(child_bucket_idx); - while (true) { - ObHashNode *prev_node = NULL; - ObHashNode *next_node = NULL; - int cmp = 0; - int ret = common::OB_SUCCESS; - if (OB_FAIL(search_sub_range_list(parent_bucket_node, &target_node, prev_node, next_node, cmp))) { - break; - } - ATOMIC_STORE(&(child_bucket_node->next_), next_node); // next_nodewould not be NULL - child_bucket_node->set_arr_idx(child_bucket_idx); // fill hash_ and mark as invisible - if (OB_LIKELY(ATOMIC_LOAD(&(prev_node->next_)) == next_node) - && OB_LIKELY(ATOMIC_BCAS(&(prev_node->next_), next_node, child_bucket_node))) { - // make the bucket_node visible to look up queries - child_bucket_node->set_bucket_filled(child_bucket_idx); - break; - } else { - TRANS_LOG(TRACE, "try insert fill error", KP(prev_node), KP(child_bucket_node), - K(child_bucket_idx), K(next_node)); - } - } - } else { - // child_bucket_node is filling, wait for it's completion - while (!child_bucket_node->is_bucket_filled()) { - PAUSE(); - } - } - } - int do_get(const Key *query_key, - ObMvccRow *&ret_value, - const Key *©_inner_key) - { - int ret = common::OB_SUCCESS; - const uint64_t query_key_hash = mark_hash(query_key->hash()); - const uint64_t query_key_so_hash = bitrev(query_key_hash); - ObHashNode *bucket_node = NULL; - Genealogy genealogy; - const int64_t arr_size = ATOMIC_LOAD(&arr_size_); - if (OB_FAIL(get_bucket_node(arr_size, query_key_so_hash, bucket_node, genealogy))) { - // no memory, do nothing - } else { - ObHashNode *op_bucket_node = fill_bucket(bucket_node, genealogy); - ObMtHashNode target_node(*query_key); - ObHashNode *prev_node = NULL; - ObHashNode *next_node = NULL; - int cmp = 0; - if (OB_FAIL(search_sub_range_list(op_bucket_node, &target_node, prev_node, next_node, cmp))) { - // do nothing - } else if (0 == cmp) { - // find the key - ObMtHashNode *catched_node = static_cast(next_node); - copy_inner_key = &(catched_node->key_); - ret_value = catched_node->value_; - } else { - // the first node >= target is strictly greater than target, - // or searches the whole link list - ret = common::OB_ENTRY_NOT_EXIST; - } - TRANS_LOG(DEBUG, "do_get finish", K(ret), K(arr_size), K(query_key_so_hash), KP(bucket_node), - K(op_bucket_node), K(genealogy), KP(prev_node), KP(next_node)); - } - return ret; - } - int insert_mt_node(const Key *insert_key, - const int64_t insert_key_hash, - const ObMvccRow *insert_row, - ObHashNode *bucket_node) - { - ObMtHashNode target_node(*insert_key); - ObHashNode *prev_node = NULL; - ObHashNode *next_node = NULL; - ObMtHashNode *new_mt_node = NULL; // allocate at most once no matter how many times repeated - int ret = common::OB_EAGAIN; - while (common::OB_EAGAIN == ret) { - int cmp = 0; - if (OB_FAIL(search_sub_range_list(bucket_node, &target_node, prev_node, next_node, cmp))) { - // do nothing - } else if (FALSE_IT(ret = common::OB_EAGAIN)) { - // do nothing - } else if (0 == cmp) { - // mtk already exists - ret = common::OB_ENTRY_EXIST; - } else { - // alloc if necessary - if (OB_LIKELY(NULL == new_mt_node)) { - void *buf = NULL; - if (OB_NOT_NULL(buf = allocator_.alloc(sizeof(ObMtHashNode)))) { - new_mt_node = new (buf) ObMtHashNode(*insert_key, insert_row); - } - } - // insert new mt_node - if (OB_ISNULL(new_mt_node)) { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - } else { - new_mt_node->next_ = next_node; - if (ATOMIC_LOAD(&(prev_node->next_)) == next_node - && ATOMIC_BCAS(&(prev_node->next_), next_node, new_mt_node)) { - try_extend(insert_key_hash); - ret = common::OB_SUCCESS; - } else { - // insert new_mt_node error - // retry - } - } - } - } - return ret; - } - - OB_INLINE void try_extend(const int64_t random_hash) - { - // use Bit[26~16] as random value - const int64_t FLUSH_LIMIT = (1 << 10); - if (OB_UNLIKELY(0 == ((random_hash >> 16) & (FLUSH_LIMIT - 1)))) { - ATOMIC_FAA(&arr_size_, FLUSH_LIMIT); - } - } - - void dump_meta_info(FILE *fd) const - { - int ret = OB_SUCCESS; - int64_t len = 0; - const int64_t DUMP_BUF_LEN = 8 * 1024; - HEAP_VAR(char[DUMP_BUF_LEN], buf) - { - memset(buf, 0, DUMP_BUF_LEN); - len = snprintf(buf, DUMP_BUF_LEN, "zero_node_=%p, tail_node_=%p, arr_size_=%ld", - &zero_node_, &tail_node_, arr_size_); - len += arr_.to_string(buf + len, DUMP_BUF_LEN - len); - fprintf(fd, "%s\n", buf); - } - } - - void dump_list(FILE *fd, - const bool print_bucket, - const bool print_row_value, - const bool print_row_value_verbose) const - { - int ret = OB_SUCCESS; - int64_t count = 0; - int64_t bucket_node_count = 0; - int64_t mt_node_count = 0; - const int64_t PRINT_LIMIT = 10L * 1000L * 1000L * 1000L; // large enough - ObHashNode *node = const_cast(&zero_node_); - const int64_t DUMP_BUF_LEN = 16 * 1024; - HEAP_VAR(char[DUMP_BUF_LEN], buf) - { - while (count < PRINT_LIMIT && node != &tail_node_) { - if (!node->is_bucket_node()) { - mt_node_count++; - fprintf(fd, "[%12ld] | mt_node, | addr=%14p | next_=%14p | hash_=%20lu=%16lx", - count, node, node->next_, node->hash_, node->hash_); - int64_t pos = 0; - memset(buf, 0, DUMP_BUF_LEN); - ObMtHashNode *mt_node = static_cast(node); - Key &mtk = mt_node->key_; - ObMvccRow *row = mt_node->value_; - pos = mtk.to_string(buf, DUMP_BUF_LEN); - if (pos < DUMP_BUF_LEN) { - (void)snprintf(buf, DUMP_BUF_LEN - pos, " | mvcc_row_addr=%p, ", row); - if (NULL != row && print_row_value) { - pos += row->to_string(buf, DUMP_BUF_LEN - pos, print_row_value_verbose); - } - } - fprintf(fd, "%s\n", buf); - } else if (print_bucket) { - bucket_node_count++; - fprintf(fd, "[%12ld] | bucket_node | addr=%14p | next_=%14p | hash_=%20lu=%16lx | back2idx=%ld\n", - count, node, node->next_, node->hash_, node->hash_, back2idx(node->hash_)); - } else { - // do nothing - } - node = node->next_; - count++; - } - if (node != &tail_node_) { - fprintf(fd, "ERROR dump_list terminated, bucket_node_count=%ld, mt_node_count=%ld, count=%ld\n", - bucket_node_count, mt_node_count, count); - } else { - fprintf(fd, "SUCCESS dump_list finish, bucket_node_count=%ld, mt_node_count=%ld, count=%ld\n", - bucket_node_count, mt_node_count, count); - } - } - } -private: - static const int64_t INIT_HASH_SIZE = 128; -private: - common::ObIAllocator &allocator_; - ObHashNode zero_node_; // for idx=0, always available - ObHashNode tail_node_; // tail node, a sentinel node, never be accessed - ObMtArray arr_; - int64_t arr_size_ CACHE_ALIGNED; // size of arr_ -}; - -} // namespace memtable -} // namespace oceanbase - -#endif diff --git a/unittest/storage/memtable/repeat_test_mt_hash.sh b/unittest/storage/memtable/repeat_test_mt_hash.sh deleted file mode 100755 index f889c2d98..000000000 --- a/unittest/storage/memtable/repeat_test_mt_hash.sh +++ /dev/null @@ -1,20 +0,0 @@ - -c=0 -echo "" > fail_page.log -while true; -do - echo "=============== $c ======" - echo "=============== $c ======" - echo "=============== $c ======" - rm mt_hash.log -f - echo "=============== $c ======" >> run.log - echo "=============== $c ======" >> run.log - echo "=============== $c ======" >> run.log - ./test_mt_hash >> run.log - if [ $? -ne 0 ]; - then - exit - fi - let c++ - tail -n 5000000 mt_hash.log | grep fail >> fail_page.log -done diff --git a/unittest/storage/memtable/test_mt_hash.cpp b/unittest/storage/memtable/test_mt_hash.cpp deleted file mode 100644 index ddbd55ed7..000000000 --- a/unittest/storage/memtable/test_mt_hash.cpp +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Copyright (c) 2025 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "lib/allocator/ob_malloc.h" -#include "storage/memtable/ob_memtable_key.h" -#include "storage/memtable/ob_mt_hash.h" - -namespace oceanbase -{ -namespace unittest -{ -using namespace oceanbase::common; -using namespace oceanbase::memtable; - -class ObTestMemtableKey : public ObMemtableKey -{ -public: - ObTestMemtableKey(const uint64_t test_hash, const int64_t table_id) - { - hash_val_ = test_hash; - table_id_ = table_id; - } -}; - -class ObTestAllocator : public ObIAllocator -{ -public: - void *alloc(const int64_t size) - { - return malloc(size); - } -}; - -// global instance -ObTestAllocator allocator; -ObMtHash mt_hash(allocator); - -// ----------- insert thread ------------ -#define PERF_KV_COUNT (1000) -#define PERF_INSERT_ROW_ROW_ROW_ROW_ROW_ROW_THREAD_NUM 64 - -int64_t total_count = 0; -int64_t total_time = 0; - -void* do_insert(void* data) -{ - int param = *(int*)(data); - TRANS_LOG(INFO, "thread running", "tid", get_itid(), K(param)); - - int64_t count = 0; - const int64_t R = 10L * PERF_KV_COUNT; - int ret = OB_SUCCESS; - int64_t start_ts = ObTimeUtility::current_time(); - char *mtk_buf = NULL; - ObTestMemtableKey *mtk = NULL; - while (count++ < PERF_KV_COUNT && OB_SUCCESS == ret) { - int64_t table_id = param * R + count; - uint64_t hash = murmurhash(&table_id, sizeof(table_id), 0); - //ObTestMemtableKey mtk(hash, table_id); - mtk_buf = (char *)(allocator.alloc(sizeof(ObTestMemtableKey))); - mtk = new (mtk_buf) ObTestMemtableKey(hash, table_id); - ObMemtableKeyWrapper mtk_wrapper(mtk); - ObMvccRow *row = reinterpret_cast(table_id); - ret = mt_hash.insert(&mtk_wrapper, row); - } - int64_t end_ts = ObTimeUtility::current_time(); - ATOMIC_FAA(&total_count, count); - ATOMIC_FAA(&total_time, (end_ts - start_ts)); - if (OB_SUCCESS != ret) { - fprintf(stdout, "ret=%d, tid=%ld, elapse=%ld, count=%ld\n", ret, get_itid(), (end_ts - start_ts), count); - } - TRANS_LOG(INFO, "thread running", "tid", get_itid(), K(param)); - return NULL; -} - -// ------------ get thread ----------------- -#define PERF_GET_THREAD_NUM 64 -void* do_get(void* data) -{ - int param = *(int*)(data); - TRANS_LOG(INFO, "thread running", "tid", get_itid(), K(param)); - int64_t count = 0; - const int64_t R = 10L * PERF_KV_COUNT; - int ret = OB_SUCCESS; - int64_t start_ts = ObTimeUtility::current_time(); - ObMvccRow *ret_row = NULL; - char *mtk_buf = NULL; - ObTestMemtableKey *mtk = NULL; - while (count++ < PERF_KV_COUNT && OB_SUCCESS == ret) { - int64_t table_id = param * R + count; - uint64_t hash = murmurhash(&table_id, sizeof(table_id), 0); - mtk_buf = (char *)(allocator.alloc(sizeof(ObTestMemtableKey))); - mtk = new (mtk_buf) ObTestMemtableKey(hash, table_id); - ObMemtableKeyWrapper mtk_wrapper(mtk); - ret = mt_hash.get(&mtk_wrapper, ret_row); - } - int64_t end_ts = ObTimeUtility::current_time(); - ATOMIC_FAA(&total_count, count); - ATOMIC_FAA(&total_time, (end_ts - start_ts)); - if (OB_SUCCESS != ret) { - fprintf(stdout, "ret=%d, tid=%ld, elapse=%ld, count=%ld\n", ret, get_itid(), (end_ts - start_ts), count); - } - TRANS_LOG(INFO, "thread running", "tid", get_itid(), K(param)); - return NULL; -} - -TEST(TestMtHash, perf_test) -{ - { - TRANS_LOG(INFO, "insert perf test start"); - fprintf(stdout, "insert perf test start\n"); - pthread_t threads[PERF_INSERT_ROW_ROW_ROW_ROW_ROW_ROW_THREAD_NUM]; - int thread_param[PERF_INSERT_ROW_ROW_ROW_ROW_ROW_ROW_THREAD_NUM]; - int err = 0; - for (int i = 0; i < PERF_INSERT_ROW_ROW_ROW_ROW_ROW_ROW_THREAD_NUM; i++) { - thread_param[i] = i + 1; - err = pthread_create(threads + i, NULL, do_insert, &(thread_param[i])); - ASSERT_EQ(0, err); - } - for (int i = 0; i < PERF_INSERT_ROW_ROW_ROW_ROW_ROW_ROW_THREAD_NUM; i++) { - pthread_join(threads[i], NULL); - } - int64_t ops = (int64_t)(total_count / (total_time * 1.0L / 1000 / 1000)); - fprintf(stdout, "insert perf: total_count=%ld, total_time=%ld, OPST= %ld\n", total_count, total_time, ops); - ATOMIC_STORE(&total_time, 0); - ATOMIC_STORE(&total_count, 0); - TRANS_LOG(INFO, "insert perf test finish"); - fprintf(stdout, "insert perf test finish\n"); - } - - - { - sleep(5); - TRANS_LOG(INFO, "get perf test start"); - fprintf(stdout, "get perf test start\n"); - pthread_t threads[PERF_GET_THREAD_NUM]; - int thread_param[PERF_GET_THREAD_NUM]; - int err = 0; - for (int i = 0; i < PERF_INSERT_ROW_ROW_ROW_ROW_ROW_ROW_THREAD_NUM; i++) { - thread_param[i] = i + 1; - err = pthread_create(threads + i, NULL, do_get, &(thread_param[i])); - ASSERT_EQ(0, err); - } - for (int i = 0; i < PERF_GET_THREAD_NUM; i++) { - pthread_join(threads[i], NULL); - } - int64_t ops = (int64_t)(total_count / (total_time * 1.0L / 1000 / 1000)); - fprintf(stdout, "get perf: NO WARMUP total_count=%ld, total_time=%ld, OPST= %ld\n", total_count, total_time, ops); - fprintf(stdout, "start test warm get"); - ATOMIC_STORE(&total_time, 0); - ATOMIC_STORE(&total_count, 0); - - sleep(5); - for (int i = 0; i < PERF_GET_THREAD_NUM; i++) { - thread_param[i] = i + 1; - err = pthread_create(threads + i, NULL, do_get, &(thread_param[i])); - ASSERT_EQ(0, err); - } - for (int i = 0; i < PERF_GET_THREAD_NUM; i++) { - pthread_join(threads[i], NULL); - } - ops = (int64_t)(total_count / (total_time * 1.0L / 1000 / 1000)); - fprintf(stdout, "get perf: WARMUP total_count=%ld, total_time=%ld, OPST= %ld\n", total_count, total_time, ops); - } -} - -// ------------ multi thread mix test ---------------- -#define MIX_TEST_KV_COUNT (100) -#define MIX_TEST_THREAD_NUM 64 -void dump_mt_hash(const char* fname) -{ - fprintf(stdout, "begin dump mt_hash into %s\n", fname); - FILE *fd = NULL; - if (NULL == (fd = fopen(fname, "w"))) { - TRANS_LOG(ERROR, "open file fail", K(fname)); - } else { - mt_hash.dump_hash(fd, true, false, false); - } -} - -void* thread_routine(void* data) -{ - int param = *(int*)(data); - TRANS_LOG(INFO, "thread running", "tid", get_itid(), K(param)); - - int64_t count = 0; - int ins_ret = OB_SUCCESS; - int get_ret = OB_SUCCESS; - const int64_t R = MIX_TEST_KV_COUNT * 100; - char *mtk_buf = NULL; - ObTestMemtableKey *mtk = NULL; - while (count < MIX_TEST_KV_COUNT) { - int64_t table_id = param * R + ObRandom::rand(0, R); - uint64_t hash = murmurhash(&table_id, sizeof(table_id), 0); - mtk_buf = (char *)(allocator.alloc(sizeof(ObTestMemtableKey))); - mtk = new (mtk_buf) ObTestMemtableKey(hash, table_id); - ObMemtableKeyWrapper mtk_wrapper(mtk); - ObMvccRow *row = reinterpret_cast(table_id); - ins_ret = mt_hash.insert(&mtk_wrapper, row); - ObMvccRow *ret_row = NULL; - get_ret = mt_hash.get(&mtk_wrapper, ret_row); - OB_ASSERT(OB_SUCCESS == get_ret); - if (OB_SUCCESS == ins_ret) { - OB_ASSERT(ret_row == row); - } else { - } - if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { - fprintf(stdout, "param=%d, count=%ld\n", param, count); - } - count++; - } - fprintf(stdout, "test insert success, thread_param=%d, count=%ld\n", param, --count); - return NULL; -} - -TEST(TestMtHash, multi_thread_test) -{ - TRANS_LOG(INFO, "multi_thread_test start"); - pthread_t threads[MIX_TEST_THREAD_NUM]; - int thread_param[MIX_TEST_THREAD_NUM]; - int err = 0; - for (int i = 0; i < MIX_TEST_THREAD_NUM; i++) { - thread_param[i] = i + 1; - err = pthread_create(threads + i, NULL, thread_routine, &(thread_param[i])); - ASSERT_EQ(0, err); - } - for (int i = 0; i < MIX_TEST_THREAD_NUM; i++) { - pthread_join(threads[i], NULL); - } - - dump_mt_hash("dump_mt_hash.txt"); - - fprintf(stdout, "test insert success, main thread\n"); - TRANS_LOG(INFO, "multi_thread_test finish"); -} - -} // namespace unittest end -} // namespace oceanbase end - -int main(int argc, char **argv) -{ - OB_LOGGER.set_file_name("mt_hash.log", true); - OB_LOGGER.set_log_level("DEBUG"); - OB_LOGGER.set_log_level("TRACE"); - OB_LOGGER.set_log_level("INFO"); - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}