From dcdb8d2c2d8374d1102fc2619b477eef8c612998 Mon Sep 17 00:00:00 2001 From: yjiao Date: Sun, 15 Oct 2017 16:45:02 -0400 Subject: [PATCH] add some git fix code --- ft/cachetable.cc | 65 +++++++++++++++++++++++++++++++++++++++-- ft/ft-flusher.cc | 9 ++++-- ft/ft-ops.cc | 11 +++++-- ft/ft_node-serialize.cc | 48 +++++++++++++++++++++++++----- 4 files changed, 118 insertions(+), 15 deletions(-) diff --git a/ft/cachetable.cc b/ft/cachetable.cc index 69c295136..095dec2c4 100644 --- a/ft/cachetable.cc +++ b/ft/cachetable.cc @@ -116,6 +116,9 @@ PATENT RIGHTS GRANT: #include #include #include "ft/ule.h" + +extern TOKULOGGER global_logger; + /////////////////////////////////////////////////////////////////////////////////// // Engine status // @@ -1080,6 +1083,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending) } } + // On entry and exit: hold the pair's mutex (p->mutex) // Method: take write lock // maybe write out the node @@ -4823,6 +4827,36 @@ void checkpointer::turn_on_pending_bits_partial() { // we may end up clearing the pending bit before the // current lock is ever released. p->checkpoint_pending = true; + + +/////////////////////////////////////////// + FTNODE node = (FTNODE) p->value_data; + if (p->dirty == false && node->unbound_insert_count > 0 && node->height == 0) { + printf("skip the pair=%p, unbound=%u, height=%d\n", p, node->unbound_insert_count, node->height); + for (int ii=0; iin_children; ii++) { + if(BP_STATE(node,ii) != PT_AVAIL) { + printf("partition ii=%d is not in memory\n", ii); + continue; + } + toku_list *unbound_msgs; + if (node->height > 0) { + unbound_msgs = &BNC(node,ii)->unbound_inserts; + } else { + unbound_msgs = &BLB(node,ii)->unbound_inserts; + } + struct toku_list *list = unbound_msgs->next; + while(list!=unbound_msgs) { + struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list); + paranoid_invariant(entry->state == UBI_UNBOUND); + toku_mutex_lock(&global_logger->ubi_lock); + toku_list_remove(&entry->in_or_out); + toku_mutex_unlock(&global_logger->ubi_lock); + list = list->next; + } + } + } +/////////////////////////////////////////// + if (m_list->m_pending_head) { m_list->m_pending_head->pending_prev = p; } @@ -4860,6 +4894,35 @@ void checkpointer::turn_on_pending_bits() { // we may end up clearing the pending bit before the // current lock is ever released. p->checkpoint_pending = true; + +/////////////////////////////////////////// + FTNODE node = (FTNODE) p->value_data; + if (p->dirty == false && node->unbound_insert_count > 0 && node->height == 0) { + printf("skip the pair=%p, unbound=%u, height=%d\n", p, node->unbound_insert_count, node->height); + for (int ii=0; iin_children; ii++) { + if(BP_STATE(node,ii) != PT_AVAIL) { + printf("partition ii=%d is not in memory\n", ii); + continue; + } + toku_list *unbound_msgs; + if (node->height > 0) { + unbound_msgs = &BNC(node,ii)->unbound_inserts; + } else { + unbound_msgs = &BLB(node,ii)->unbound_inserts; + } + struct toku_list *list = unbound_msgs->next; + while(list!=unbound_msgs) { + struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list); + paranoid_invariant(entry->state == UBI_UNBOUND); + toku_mutex_lock(&global_logger->ubi_lock); + toku_list_remove(&entry->in_or_out); + toku_mutex_unlock(&global_logger->ubi_lock); + list = list->next; + } + } + } +/////////////////////////////////////////// + if (m_list->m_pending_head) { m_list->m_pending_head->pending_prev = p; } @@ -5337,8 +5400,6 @@ void cachefile_list::free_stale_data(evictor* ev) { evict_pair_from_cachefile(p); ev->remove_pair_attr(p->attr); cachetable_free_pair(p); - - // now that we have evicted something, // let's check if the cachefile is needed anymore if (m_stale_tail->cf_head == NULL) { CACHEFILE cf_to_destroy = m_stale_tail; diff --git a/ft/ft-flusher.cc b/ft/ft-flusher.cc index 5490a1e77..a56529bf4 100644 --- a/ft/ft-flusher.cc +++ b/ft/ft-flusher.cc @@ -1745,6 +1745,7 @@ setup_available_ftnode_partition(FTNODE node, int i) { #ifdef DEAD_LEAF static void ft_flush_blind_delete_basement(FTNODE node, int i, FT_MSG msg) { + uint64_t ubi_count; call_flusher_thread_callback(flt_flush_completely_delete_basement); switch(BP_STATE(node, i)) { case PT_AVAIL: @@ -1754,8 +1755,12 @@ static void ft_flush_blind_delete_basement(FTNODE node, int i, FT_MSG msg) { BASEMENTNODE bn = BLB(node, i); if(msg->msn.msn > bn ->max_msn_applied.msn){ - call_flusher_thread_callback(flt_flush_delete_in_mem_basement); + call_flusher_thread_callback(flt_flush_delete_in_mem_basement); + ubi_count = bn->unbound_insert_count; destroy_basement_node(bn); + if (ubi_count > 0) { + node->unbound_insert_count -= ubi_count; + } BP_WORKDONE(node,i)=0; setup_available_ftnode_partition(node, i); BLB_MAX_MSN_APPLIED(node,i) = msg->msn; @@ -1781,7 +1786,7 @@ static void ft_flush_blind_delete_basement(FTNODE node, int i, FT_MSG msg) { } else { status_inc(FT_MSN_DISCARDS, 1); } - } + } break; case PT_ON_DISK: //just init the bn diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc index 7f6f2a169..a65070c2e 100644 --- a/ft/ft-ops.cc +++ b/ft/ft-ops.cc @@ -991,6 +991,7 @@ void toku_ftnode_clone_callback( toku_assert_entire_node_in_memory(node); FT ft = static_cast(write_extraargs); FTNODE XCALLOC(cloned_node); + if (node->height == 0) { // set header stats, must be done before rebalancing ftnode_update_disk_stats(node, ft, for_checkpoint); @@ -1040,7 +1041,7 @@ void toku_ftnode_clone_callback( } *clone_size = ftnode_memory_size(cloned_node); *cloned_value_data = cloned_node; - cloned_node->ct_pair = node->ct_pair; + cloned_node->ct_pair = node->ct_pair; toku_ft_node_unbound_inserts_validation(node); //toku_ft_node_unbound_inserts_validation(cloned_node); } @@ -1341,10 +1342,15 @@ compress_internal_node_partition(FTNODE node, int i, enum toku_compression_metho void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h) { // free the basement node + uint64_t ubi_count; assert(!node->dirty); BASEMENTNODE bn = BLB(node, childnum); + ubi_count = bn->unbound_insert_count; toku_ft_decrease_stats(&h->in_memory_stats, bn->stat64_delta); destroy_basement_node(bn); + if (ubi_count > 0) { + node->unbound_insert_count -= ubi_count; + } set_BNULL(node, childnum); BP_STATE(node, childnum) = PT_ON_DISK; } @@ -2086,7 +2092,7 @@ toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) { void toku_ft_bn_apply_cmd_once ( BASEMENTNODE bn, - struct unbound_insert_entry *ubi_entry, + struct unbound_insert_entry *UU(ubi_entry), const FT_MSG cmd, uint32_t idx, LEAFENTRY le, @@ -3585,7 +3591,6 @@ toku_ft_node_put_cmd ( // SOSP TODO: update unbound_msg_count at node level. let lower-levels update their own partition counts - toku_ft_node_unbound_inserts_validation(node, cmd, __LINE__); if (node->height==0) { toku_ft_leaf_apply_cmd(ft, compare_fun, update_fun, desc, ubi_entry, node, target_childnum, cmd, gc_info, nullptr, stats_to_update); diff --git a/ft/ft_node-serialize.cc b/ft/ft_node-serialize.cc index 3d6819fda..d986d15f8 100644 --- a/ft/ft_node-serialize.cc +++ b/ft/ft_node-serialize.cc @@ -707,8 +707,8 @@ rebalance_ftnode_leaf(FTNODE node, unsigned int basementnodesize) BLB_MAX_MSN_APPLIED(node,i) = max_msn; baseindex_this_bn += num_les_to_copy; // set to index of next bn } - node->max_msn_applied_to_node_on_disk = max_msn; + node->max_msn_applied_to_node_on_disk = max_msn; //SOSP: Jun : hack on rebalance, since rebalancing only happens before checkpoint or serialization //either case is right before writing back. //there is no chance for split or merge to mess up with unbound entries then we can solely use the bn @@ -730,13 +730,20 @@ rebalance_ftnode_leaf(FTNODE node, unsigned int basementnodesize) old_bns[i]->unbound_insert_count = 0; } BLB(node,0)->unbound_insert_count = n_unbound_entry; - } + } node->unbound_insert_count = n_unbound_entry; // destroy buffers of old mempools //toku_ft_node_empty_unbound_inserts_validation(node); //toku_ft_node_unbound_inserts_validation(node); for (uint32_t i = 0; i < num_orig_basements; i++) { + toku_list * head = &old_bns[i]->unbound_inserts; + if (toku_list_empty(head)) { + if (old_bns[i]->unbound_insert_count != 0) { + printf("i=%d, unbound_insert_count=%d\n", i, old_bns[i]->unbound_insert_count); + paranoid_invariant(old_bns[i]->unbound_insert_count == 0); + } + } destroy_basement_node(old_bns[i]); } @@ -1276,16 +1283,41 @@ NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) { return cn; } -static void toku_jun_debug(void) { - printf("oh nooo\n"); -} +extern TOKULOGGER global_logger; + void destroy_basement_node (BASEMENTNODE bn) { bn->data_buffer.destroy(); + + if (!toku_list_empty(&bn->unbound_inserts)) { + toku_list *unbound_msgs; + unbound_msgs = &(bn->unbound_inserts); + + struct toku_list *list = unbound_msgs->next; + + while (list != unbound_msgs) { + struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list); + paranoid_invariant(entry->state == UBI_UNBOUND); + toku_mutex_lock(&global_logger->ubi_lock); + toku_list_remove(&entry->in_or_out); + toku_mutex_unlock(&global_logger->ubi_lock); + list = list->next; + } + + list = unbound_msgs->next; + + while (list != unbound_msgs) { + struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list); + paranoid_invariant(entry->state == UBI_UNBOUND); + + toku_list *to_be_removed = list; + list = list->next; + toku_list_remove(to_be_removed); + bn->unbound_insert_count--; + } + } + paranoid_invariant(bn->unbound_insert_count == 0); - if(!toku_list_empty(&bn->unbound_inserts)) { - toku_jun_debug() ; - } paranoid_invariant(toku_list_empty(&bn->unbound_inserts)); toku_free(bn); }