Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,8 @@ qd_message_t *qd_message(void)

ZERO(msg->content);
sys_mutex_init(&msg->content->lock);
sys_mutex_init(&msg->content->producer_activation_lock);
sys_mutex_init(&msg->content->consumer_activation_lock);
sys_atomic_init(&msg->content->aborted, 0);
sys_atomic_init(&msg->content->discard, 0);
sys_atomic_init(&msg->content->no_body, 0);
Expand Down Expand Up @@ -1196,6 +1198,8 @@ void qd_message_free(qd_message_t *in_msg)
qd_buffer_free(content->pending);

sys_mutex_free(&content->lock);
sys_mutex_free(&content->consumer_activation_lock);
sys_mutex_free(&content->producer_activation_lock);
sys_atomic_destroy(&content->aborted);
sys_atomic_destroy(&content->discard);
sys_atomic_destroy(&content->no_body);
Expand Down Expand Up @@ -1602,11 +1606,11 @@ bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg)
static inline void activate_message_consumer(qd_message_t *stream)
{
qd_message_content_t *content = MSG_CONTENT(stream);
LOCK(&content->lock);
LOCK(&content->consumer_activation_lock);
if (content->uct_consumer_activation.type != QD_ACTIVATION_NONE) {
cutthrough_notify_buffers_produced_inbound(&content->uct_consumer_activation);
}
UNLOCK(&content->lock);
UNLOCK(&content->consumer_activation_lock);
}


Expand All @@ -1616,11 +1620,11 @@ static inline void activate_message_producer(qd_message_t *stream)

uint32_t full_slots = (sys_atomic_get(&content->uct_produce_slot) - sys_atomic_get(&content->uct_consume_slot)) % UCT_SLOT_COUNT;
if (full_slots < UCT_RESUME_THRESHOLD) {
LOCK(&content->lock);
LOCK(&content->producer_activation_lock);
if (content->uct_producer_activation.type != QD_ACTIVATION_NONE) {
cutthrough_notify_buffers_consumed_outbound(&content->uct_producer_activation);
}
UNLOCK(&content->lock);
UNLOCK(&content->producer_activation_lock);
}
}

Expand Down Expand Up @@ -3008,32 +3012,32 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
void qd_message_set_consumer_activation(qd_message_t *stream, qd_message_activation_t *activation)
{
qd_message_content_t *content = MSG_CONTENT(stream);
LOCK(&content->lock);
LOCK(&content->consumer_activation_lock);
content->uct_consumer_activation = *activation;
UNLOCK(&content->lock);
UNLOCK(&content->consumer_activation_lock);
}


void qd_message_cancel_consumer_activation(qd_message_t *stream)
{
qd_message_content_t *content = MSG_CONTENT(stream);
LOCK(&content->lock);
LOCK(&content->consumer_activation_lock);
content->uct_consumer_activation.type = QD_ACTIVATION_NONE;
UNLOCK(&content->lock);
UNLOCK(&content->consumer_activation_lock);
}

void qd_message_set_producer_activation(qd_message_t *stream, qd_message_activation_t *activation)
{
qd_message_content_t *content = MSG_CONTENT(stream);
LOCK(&content->lock);
LOCK(&content->producer_activation_lock);
content->uct_producer_activation = *activation;
UNLOCK(&content->lock);
UNLOCK(&content->producer_activation_lock);
}

void qd_message_cancel_producer_activation(qd_message_t *stream)
{
qd_message_content_t *content = MSG_CONTENT(stream);
LOCK(&content->lock);
LOCK(&content->producer_activation_lock);
content->uct_producer_activation.type = QD_ACTIVATION_NONE;
UNLOCK(&content->lock);
UNLOCK(&content->producer_activation_lock);
}
5 changes: 5 additions & 0 deletions src/message_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ typedef struct {

typedef struct {
sys_mutex_t lock;
sys_mutex_t producer_activation_lock; // These locks prevent either side from activating
sys_mutex_t consumer_activation_lock; // the other during tear-down.
// Using these locks, rather than the content lock
// for this purpose, eliminates severe contention
// that was observed on the content lock.
sys_atomic_t ref_count; // The number of messages referencing this
qd_buffer_list_t buffers; // The buffer chain containing the message
qd_buffer_t *pending; // Buffer owned by and filled by qd_message_receive
Expand Down