Skip to content

Commit

Permalink
fixed the algorithm to cope with some corner cases triggered by stres…
Browse files Browse the repository at this point in the history
…s tests

the library now looks really safe and has been tested under heavy concurrency load
  • Loading branch information
Andrea Guzzo committed Jun 5, 2014
1 parent 43b80ee commit b5bec63
Showing 1 changed file with 99 additions and 65 deletions.
164 changes: 99 additions & 65 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ queue_entry_t *help_insert(queue_entry_t *prev, queue_entry_t *entry)
queue_entry_t *prev2 = get_node_ptr(deref_link(prev->refcnt, &prev->next));
if (prev2 == NULL) {
if (last != NULL) {
//mark_prev(prev);
mark_prev(prev);
queue_entry_t *next2 = get_node_ptr(deref_link_d(prev->refcnt, &prev->next));
if (next2) {
if (ATOMIC_CMPXCHG(last->next, ATOMIC_READ(prev->node), ATOMIC_READ(next2->node))) {
Expand Down Expand Up @@ -239,7 +239,7 @@ help_delete(queue_entry_t *entry)
if (prev == next)
break;
if (next && REFCNT_IS_MARKED(next->prev)) {
//mark_prev(next);
mark_prev(next);
queue_entry_t *next2 = get_node_ptr(deref_link_d(next->refcnt, &next->next));
release_ref(next->refcnt, ATOMIC_READ(next->node));
next = next2;
Expand All @@ -248,7 +248,7 @@ help_delete(queue_entry_t *entry)
queue_entry_t *prev2 = get_node_ptr(deref_link(prev->refcnt, &prev->next));
if (prev2 == NULL) {
if (last != NULL) {
//mark_prev(prev);
mark_prev(prev);
queue_entry_t *next2 = get_node_ptr(deref_link_d(prev->refcnt, &prev->next));
if (next2) {
if (ATOMIC_CMPXCHG(last->next, ATOMIC_READ(prev->node), ATOMIC_READ(next2->node))) {
Expand All @@ -274,9 +274,17 @@ help_delete(queue_entry_t *entry)
continue;
}
if (ATOMIC_CMPXCHG(prev->next, ATOMIC_READ(entry->node), ATOMIC_READ(next->node))) {
refcnt_node_t *next_prev = ATOMIC_READ(next->prev);
if (!ATOMIC_CMPXCHG(next->prev, next_prev, ATOMIC_READ(prev->node))) {
next_prev = ATOMIC_READ(next->prev);
}
if (REFCNT_MARK_OFF(next_prev) != ATOMIC_READ(prev2->node))
release_ref(next->refcnt, next_prev);
release_ref(prev2->refcnt, ATOMIC_READ(prev2->node));
if (next)
retain_ref(next->refcnt, ATOMIC_READ(next->node));

retain_ref(next->refcnt, ATOMIC_READ(next->node));
retain_ref(prev->refcnt, prev->node);

release_ref(entry->refcnt, ATOMIC_READ(entry->node));
break;
}
Expand All @@ -295,7 +303,7 @@ help_delete(queue_entry_t *entry)
/*
* Insert a queue_entry_t at the beginning of a queue (or at the top if the stack)
*/
inline int
int
queue_push_left(queue_t *q, void *value)
{
queue_entry_t *entry = create_entry(q->refcnt);
Expand All @@ -306,38 +314,36 @@ queue_push_left(queue_t *q, void *value)

queue_entry_t *prev = ATOMIC_READ(q->head);

retain_ref(q->refcnt, prev->node);
retain_ref(q->refcnt, ATOMIC_READ(prev->node));

queue_entry_t *next = get_node_ptr(deref_link(prev->refcnt, &prev->next));
while (1) {
queue_entry_t *next = get_node_ptr(deref_link(prev->refcnt, &prev->next));
if (!next)
continue;

if (!next) {
destroy_entry(entry);
return -1;
}
store_ref(entry->refcnt, &entry->prev, ATOMIC_READ(prev->node));
store_ref(entry->refcnt, &entry->next, ATOMIC_READ(next->node));

while (next) {
refcnt_node_t *link = ATOMIC_READ(prev->node);
entry->prev = ATOMIC_READ(prev->node);
entry->next = ATOMIC_READ(next->node);
if (ATOMIC_CMPXCHG(prev->next, ATOMIC_READ(next->node), ATOMIC_READ(entry->node))) {
if (ATOMIC_CMPXCHG(next->prev, REFCNT_MARK_OFF(entry->prev), entry->node)) {
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
retain_ref(entry->refcnt, ATOMIC_READ(entry->node));
while (!ATOMIC_CMPXCHG(next->prev, REFCNT_MARK_OFF(link), ATOMIC_READ(entry->node))) {
release_ref(prev->refcnt, prev->node);
next = get_node_ptr(deref_link(prev->refcnt, &prev->next));
continue;
while (!ATOMIC_CMPXCHG(prev->next, ATOMIC_READ(next->node), ATOMIC_READ(entry->node))) {
queue_entry_t *next2 = get_node_ptr(deref_link(prev->refcnt, &prev->next));
release_ref(next->refcnt, next->node);
next = next2;
store_ref(entry->refcnt, &entry->next, ATOMIC_READ(next->node));
}
release_ref(next->refcnt, ATOMIC_READ(next->node));
retain_ref(entry->refcnt, ATOMIC_READ(entry->node));
release_ref(next->refcnt, ATOMIC_READ(next->node));
break;
}

release_ref(next->refcnt, ATOMIC_READ(next->node));
next = get_node_ptr(deref_link(next->refcnt, &next->prev));

if (next)
release_ref(next->refcnt, ATOMIC_READ(next->node));
}

ATOMIC_INCREMENT(q->length, 1);
if (next)
release_ref(next->refcnt, ATOMIC_READ(next->node));
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
return 0;
}
Expand All @@ -358,34 +364,33 @@ queue_push_right(queue_t *q, void *value)

retain_ref(next->refcnt, ATOMIC_READ(next->node));

queue_entry_t *prev = get_node_ptr(deref_link(next->refcnt, &next->prev));
while (1) {
queue_entry_t *prev = get_node_ptr(deref_link(next->refcnt, &next->prev));
if (!prev)
continue;

if (!prev) {
destroy_entry(entry);
return -1;
}
store_ref(entry->refcnt, &entry->next, ATOMIC_READ(next->node));
store_ref(entry->refcnt, &entry->prev, ATOMIC_READ(prev->node));

while (prev) {
refcnt_node_t *link = ATOMIC_READ(next->node);
entry->prev = ATOMIC_READ(prev->node);
entry->next = ATOMIC_READ(next->node);
if (ATOMIC_CMPXCHG(next->prev, ATOMIC_READ(prev->node), ATOMIC_READ(entry->node))) {
if (ATOMIC_CMPXCHG(prev->next, REFCNT_MARK_OFF(entry->next), entry->node)) {
release_ref(next->refcnt, ATOMIC_READ(next->node));
retain_ref(entry->refcnt, ATOMIC_READ(entry->node));
while (!ATOMIC_CMPXCHG(prev->next, REFCNT_MARK_OFF(link), ATOMIC_READ(entry->node))) {
while (!ATOMIC_CMPXCHG(next->prev, ATOMIC_READ(prev->node), ATOMIC_READ(entry->node))) {
queue_entry_t *prev2 = get_node_ptr(deref_link(next->refcnt, &next->prev));
release_ref(prev->refcnt, prev->node);
prev = get_node_ptr(deref_link(next->refcnt, &next->prev));
continue;
prev = prev2;
store_ref(entry->refcnt, &entry->prev, ATOMIC_READ(prev->node));
}
release_ref(prev->refcnt, prev->node);
retain_ref(entry->refcnt, ATOMIC_READ(entry->node));
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
break;
}

release_ref(prev->refcnt, ATOMIC_READ(prev->node));
prev = get_node_ptr(deref_link(next->refcnt, &next->prev));
if (prev)
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
}
ATOMIC_INCREMENT(q->length, 1);
if (prev)
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
release_ref(next->refcnt, ATOMIC_READ(next->node));
return 0;
}
Expand All @@ -408,37 +413,51 @@ queue_pop_left(queue_t *q)
retain_ref(prev->refcnt, ATOMIC_READ(prev->node));
while(1) {
entry = get_node_ptr(deref_link(prev->refcnt, &prev->next));
if (!entry)
continue;

if (ATOMIC_READ(entry->prev) != ATOMIC_READ(prev->node)) {
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
continue;
}
if (!entry || entry == ATOMIC_READ(q->tail)) {
if (entry)
release_ref(q->refcnt, ATOMIC_READ(entry->node));
release_ref(q->refcnt, ATOMIC_READ(prev->node));
return NULL;
}

if (ATOMIC_READ(entry->prev) != ATOMIC_READ(prev->node)) {
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
continue;
}

refcnt_node_t *link1 = ATOMIC_READ(entry->next);

if (ATOMIC_CMPXCHG(entry->next, REFCNT_MARK_OFF(link1), REFCNT_MARK_ON(link1))) {
refcnt_node_t *link2;
do {
link2 = ATOMIC_READ(entry->prev);
} while (!ATOMIC_CMPXCHG(entry->prev, link2, REFCNT_MARK_ON(link2)));
queue_entry_t *prev2 = help_delete(entry);
queue_entry_t *next = get_node_ptr(link1);
help_insert(prev2, next);
release_ref(prev2->refcnt, ATOMIC_READ(prev2->node));

queue_entry_t *next = NULL;
do {
if (next)
release_ref(next->refcnt, ATOMIC_READ(next->node));
next = get_node_ptr(deref_link_d(entry->refcnt, &entry->next));
} while(!ATOMIC_CMPXCHG(next->prev, ATOMIC_READ(entry->node), ATOMIC_READ(prev->node)));

release_ref(entry->refcnt, entry->node);
retain_ref(prev->refcnt, ATOMIC_READ(prev->node));

if (ATOMIC_CMPXCHG(prev->next, ATOMIC_READ(entry->node), ATOMIC_READ(next->node))) {
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
retain_ref(next->refcnt, ATOMIC_READ(next->node));
}

if (next)
release_ref(next->refcnt, ATOMIC_READ(next->node));

v = entry->value;
break;
}
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
}
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
// XXX - implement
//remove_cross_reference(entry);
if (entry) {
ATOMIC_DECREMENT(q->length, 1);
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
Expand All @@ -464,29 +483,44 @@ queue_pop_right(queue_t *q)
retain_ref(next->refcnt, ATOMIC_READ(next->node));
while(1) {
entry = get_node_ptr(deref_link(next->refcnt, &next->prev));
if (!entry)
continue;

if (ATOMIC_READ(entry->next) != ATOMIC_READ(next->node)) {
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
continue;
}
if (!entry || entry == ATOMIC_READ(q->head)) {
if (entry)
release_ref(q->refcnt, ATOMIC_READ(entry->node));
release_ref(q->refcnt, ATOMIC_READ(next->node));
return NULL;
}

if (ATOMIC_READ(entry->next) != ATOMIC_READ(next->node)) {
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
continue;
}

refcnt_node_t *link1 = ATOMIC_READ(entry->prev);

if (ATOMIC_CMPXCHG(entry->prev, REFCNT_MARK_OFF(link1), REFCNT_MARK_ON(link1))) {
refcnt_node_t *link2;
do {
link2 = ATOMIC_READ(entry->next);
} while (!ATOMIC_CMPXCHG(entry->next, link2, REFCNT_MARK_ON(link2)));
queue_entry_t *prev2 = help_delete(entry);
queue_entry_t *next = get_node_ptr(link2);
prev2 = help_insert(prev2, next);
release_ref(prev2->refcnt, ATOMIC_READ(prev2->node));

queue_entry_t *prev = NULL;
do {
if (prev)
release_ref(prev->refcnt, ATOMIC_READ(prev->node));
prev = get_node_ptr(deref_link_d(entry->refcnt, &entry->prev));
} while(!ATOMIC_CMPXCHG(prev->next, ATOMIC_READ(entry->node), ATOMIC_READ(next->node)));
release_ref(entry->refcnt, entry->node);
retain_ref(next->refcnt, ATOMIC_READ(next->node));

if (ATOMIC_CMPXCHG(next->prev, ATOMIC_READ(entry->node), ATOMIC_READ(prev->node))) {
release_ref(entry->refcnt, ATOMIC_READ(entry->node));
retain_ref(prev->refcnt, ATOMIC_READ(prev->node));
}

if (prev)
release_ref(prev->refcnt, ATOMIC_READ(prev->node));

v = entry->value;
break;
}
Expand Down

0 comments on commit b5bec63

Please sign in to comment.