Skip to content

Commit

Permalink
add some git fix code
Browse files Browse the repository at this point in the history
  • Loading branch information
yjiao committed Oct 15, 2017
1 parent 976807a commit dcdb8d2
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 15 deletions.
65 changes: 63 additions & 2 deletions ft/cachetable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ PATENT RIGHTS GRANT:
#include <util/rwlock.h>
#include <util/status.h>
#include "ft/ule.h"

extern TOKULOGGER global_logger;

///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
Expand Down Expand Up @@ -1080,6 +1083,7 @@ write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending)
}
}


// On entry and exit: hold the pair's mutex (p->mutex)
// Method: take write lock
// maybe write out the node
Expand Down Expand Up @@ -4823,6 +4827,36 @@ void checkpointer::turn_on_pending_bits_partial() {
// we may end up clearing the pending bit before the
// current lock is ever released.
p->checkpoint_pending = true;


///////////////////////////////////////////
FTNODE node = (FTNODE) p->value_data;
if (p->dirty == false && node->unbound_insert_count > 0 && node->height == 0) {
printf("skip the pair=%p, unbound=%u, height=%d\n", p, node->unbound_insert_count, node->height);
for (int ii=0; ii<node->n_children; ii++) {
if(BP_STATE(node,ii) != PT_AVAIL) {
printf("partition ii=%d is not in memory\n", ii);
continue;
}
toku_list *unbound_msgs;
if (node->height > 0) {
unbound_msgs = &BNC(node,ii)->unbound_inserts;
} else {
unbound_msgs = &BLB(node,ii)->unbound_inserts;
}
struct toku_list *list = unbound_msgs->next;
while(list!=unbound_msgs) {
struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list);
paranoid_invariant(entry->state == UBI_UNBOUND);
toku_mutex_lock(&global_logger->ubi_lock);
toku_list_remove(&entry->in_or_out);
toku_mutex_unlock(&global_logger->ubi_lock);
list = list->next;
}
}
}
///////////////////////////////////////////

if (m_list->m_pending_head) {
m_list->m_pending_head->pending_prev = p;
}
Expand Down Expand Up @@ -4860,6 +4894,35 @@ void checkpointer::turn_on_pending_bits() {
// we may end up clearing the pending bit before the
// current lock is ever released.
p->checkpoint_pending = true;

///////////////////////////////////////////
FTNODE node = (FTNODE) p->value_data;
if (p->dirty == false && node->unbound_insert_count > 0 && node->height == 0) {
printf("skip the pair=%p, unbound=%u, height=%d\n", p, node->unbound_insert_count, node->height);
for (int ii=0; ii<node->n_children; ii++) {
if(BP_STATE(node,ii) != PT_AVAIL) {
printf("partition ii=%d is not in memory\n", ii);
continue;
}
toku_list *unbound_msgs;
if (node->height > 0) {
unbound_msgs = &BNC(node,ii)->unbound_inserts;
} else {
unbound_msgs = &BLB(node,ii)->unbound_inserts;
}
struct toku_list *list = unbound_msgs->next;
while(list!=unbound_msgs) {
struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list);
paranoid_invariant(entry->state == UBI_UNBOUND);
toku_mutex_lock(&global_logger->ubi_lock);
toku_list_remove(&entry->in_or_out);
toku_mutex_unlock(&global_logger->ubi_lock);
list = list->next;
}
}
}
///////////////////////////////////////////

if (m_list->m_pending_head) {
m_list->m_pending_head->pending_prev = p;
}
Expand Down Expand Up @@ -5337,8 +5400,6 @@ void cachefile_list::free_stale_data(evictor* ev) {
evict_pair_from_cachefile(p);
ev->remove_pair_attr(p->attr);
cachetable_free_pair(p);

// now that we have evicted something,
// let's check if the cachefile is needed anymore
if (m_stale_tail->cf_head == NULL) {
CACHEFILE cf_to_destroy = m_stale_tail;
Expand Down
9 changes: 7 additions & 2 deletions ft/ft-flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,7 @@ setup_available_ftnode_partition(FTNODE node, int i) {

#ifdef DEAD_LEAF
static void ft_flush_blind_delete_basement(FTNODE node, int i, FT_MSG msg) {
uint64_t ubi_count;
call_flusher_thread_callback(flt_flush_completely_delete_basement);
switch(BP_STATE(node, i)) {
case PT_AVAIL:
Expand All @@ -1754,8 +1755,12 @@ static void ft_flush_blind_delete_basement(FTNODE node, int i, FT_MSG msg) {

BASEMENTNODE bn = BLB(node, i);
if(msg->msn.msn > bn ->max_msn_applied.msn){
call_flusher_thread_callback(flt_flush_delete_in_mem_basement);
call_flusher_thread_callback(flt_flush_delete_in_mem_basement);
ubi_count = bn->unbound_insert_count;
destroy_basement_node(bn);
if (ubi_count > 0) {
node->unbound_insert_count -= ubi_count;
}
BP_WORKDONE(node,i)=0;
setup_available_ftnode_partition(node, i);
BLB_MAX_MSN_APPLIED(node,i) = msg->msn;
Expand All @@ -1781,7 +1786,7 @@ static void ft_flush_blind_delete_basement(FTNODE node, int i, FT_MSG msg) {
} else {
status_inc(FT_MSN_DISCARDS, 1);
}
}
}
break;
case PT_ON_DISK:
//just init the bn
Expand Down
11 changes: 8 additions & 3 deletions ft/ft-ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ void toku_ftnode_clone_callback(
toku_assert_entire_node_in_memory(node);
FT ft = static_cast<FT>(write_extraargs);
FTNODE XCALLOC(cloned_node);

if (node->height == 0) {
// set header stats, must be done before rebalancing
ftnode_update_disk_stats(node, ft, for_checkpoint);
Expand Down Expand Up @@ -1040,7 +1041,7 @@ void toku_ftnode_clone_callback(
}
*clone_size = ftnode_memory_size(cloned_node);
*cloned_value_data = cloned_node;
cloned_node->ct_pair = node->ct_pair;
cloned_node->ct_pair = node->ct_pair;
toku_ft_node_unbound_inserts_validation(node);
//toku_ft_node_unbound_inserts_validation(cloned_node);
}
Expand Down Expand Up @@ -1341,10 +1342,15 @@ compress_internal_node_partition(FTNODE node, int i, enum toku_compression_metho

void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h) {
// free the basement node
uint64_t ubi_count;
assert(!node->dirty);
BASEMENTNODE bn = BLB(node, childnum);
ubi_count = bn->unbound_insert_count;
toku_ft_decrease_stats(&h->in_memory_stats, bn->stat64_delta);
destroy_basement_node(bn);
if (ubi_count > 0) {
node->unbound_insert_count -= ubi_count;
}
set_BNULL(node, childnum);
BP_STATE(node, childnum) = PT_ON_DISK;
}
Expand Down Expand Up @@ -2086,7 +2092,7 @@ toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
void
toku_ft_bn_apply_cmd_once (
BASEMENTNODE bn,
struct unbound_insert_entry *ubi_entry,
struct unbound_insert_entry *UU(ubi_entry),
const FT_MSG cmd,
uint32_t idx,
LEAFENTRY le,
Expand Down Expand Up @@ -3585,7 +3591,6 @@ toku_ft_node_put_cmd (

// SOSP TODO: update unbound_msg_count at node level. let lower-levels update their own partition counts


toku_ft_node_unbound_inserts_validation(node, cmd, __LINE__);
if (node->height==0) {
toku_ft_leaf_apply_cmd(ft, compare_fun, update_fun, desc, ubi_entry, node, target_childnum, cmd, gc_info, nullptr, stats_to_update);
Expand Down
48 changes: 40 additions & 8 deletions ft/ft_node-serialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ rebalance_ftnode_leaf(FTNODE node, unsigned int basementnodesize)
BLB_MAX_MSN_APPLIED(node,i) = max_msn;
baseindex_this_bn += num_les_to_copy; // set to index of next bn
}
node->max_msn_applied_to_node_on_disk = max_msn;

node->max_msn_applied_to_node_on_disk = max_msn;
//SOSP: Jun : hack on rebalance, since rebalancing only happens before checkpoint or serialization
//either case is right before writing back.
//there is no chance for split or merge to mess up with unbound entries then we can solely use the bn
Expand All @@ -730,13 +730,20 @@ rebalance_ftnode_leaf(FTNODE node, unsigned int basementnodesize)
old_bns[i]->unbound_insert_count = 0;
}
BLB(node,0)->unbound_insert_count = n_unbound_entry;
}
}
node->unbound_insert_count = n_unbound_entry; // destroy buffers of old mempools
//toku_ft_node_empty_unbound_inserts_validation(node);
//toku_ft_node_unbound_inserts_validation(node);


for (uint32_t i = 0; i < num_orig_basements; i++) {
toku_list * head = &old_bns[i]->unbound_inserts;
if (toku_list_empty(head)) {
if (old_bns[i]->unbound_insert_count != 0) {
printf("i=%d, unbound_insert_count=%d\n", i, old_bns[i]->unbound_insert_count);
paranoid_invariant(old_bns[i]->unbound_insert_count == 0);
}
}
destroy_basement_node(old_bns[i]);
}

Expand Down Expand Up @@ -1276,16 +1283,41 @@ NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
return cn;
}

static void toku_jun_debug(void) {
printf("oh nooo\n");
}
extern TOKULOGGER global_logger;

void destroy_basement_node (BASEMENTNODE bn)
{
bn->data_buffer.destroy();

if (!toku_list_empty(&bn->unbound_inserts)) {
toku_list *unbound_msgs;
unbound_msgs = &(bn->unbound_inserts);

struct toku_list *list = unbound_msgs->next;

while (list != unbound_msgs) {
struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list);
paranoid_invariant(entry->state == UBI_UNBOUND);
toku_mutex_lock(&global_logger->ubi_lock);
toku_list_remove(&entry->in_or_out);
toku_mutex_unlock(&global_logger->ubi_lock);
list = list->next;
}

list = unbound_msgs->next;

while (list != unbound_msgs) {
struct unbound_insert_entry *entry = toku_list_struct(list, struct unbound_insert_entry, node_list);
paranoid_invariant(entry->state == UBI_UNBOUND);

toku_list *to_be_removed = list;
list = list->next;
toku_list_remove(to_be_removed);
bn->unbound_insert_count--;
}
}

paranoid_invariant(bn->unbound_insert_count == 0);
if(!toku_list_empty(&bn->unbound_inserts)) {
toku_jun_debug() ;
}
paranoid_invariant(toku_list_empty(&bn->unbound_inserts));
toku_free(bn);
}
Expand Down

0 comments on commit dcdb8d2

Please sign in to comment.