Skip to content

Commit 9edda73

Browse files
mgoulishmickkgiusti
authored
New locks in message.c to eliminate contention on content lock (#1752)
* New locks in message.c to eliminate contention on content lock * Improve naming, code consistency, and free the locks. --------- Co-authored-by: mick <[email protected]> Co-authored-by: Ken Giusti <[email protected]>
1 parent ad95db4 commit 9edda73

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

src/message.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,8 @@ qd_message_t *qd_message(void)
11121112

11131113
ZERO(msg->content);
11141114
sys_mutex_init(&msg->content->lock);
1115+
sys_mutex_init(&msg->content->producer_activation_lock);
1116+
sys_mutex_init(&msg->content->consumer_activation_lock);
11151117
sys_atomic_init(&msg->content->aborted, 0);
11161118
sys_atomic_init(&msg->content->discard, 0);
11171119
sys_atomic_init(&msg->content->no_body, 0);
@@ -1196,6 +1198,8 @@ void qd_message_free(qd_message_t *in_msg)
11961198
qd_buffer_free(content->pending);
11971199

11981200
sys_mutex_free(&content->lock);
1201+
sys_mutex_free(&content->consumer_activation_lock);
1202+
sys_mutex_free(&content->producer_activation_lock);
11991203
sys_atomic_destroy(&content->aborted);
12001204
sys_atomic_destroy(&content->discard);
12011205
sys_atomic_destroy(&content->no_body);
@@ -1602,11 +1606,11 @@ bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg)
16021606
static inline void activate_message_consumer(qd_message_t *stream)
16031607
{
16041608
qd_message_content_t *content = MSG_CONTENT(stream);
1605-
LOCK(&content->lock);
1609+
LOCK(&content->consumer_activation_lock);
16061610
if (content->uct_consumer_activation.type != QD_ACTIVATION_NONE) {
16071611
cutthrough_notify_buffers_produced_inbound(&content->uct_consumer_activation);
16081612
}
1609-
UNLOCK(&content->lock);
1613+
UNLOCK(&content->consumer_activation_lock);
16101614
}
16111615

16121616

@@ -1616,11 +1620,11 @@ static inline void activate_message_producer(qd_message_t *stream)
16161620

16171621
uint32_t full_slots = (sys_atomic_get(&content->uct_produce_slot) - sys_atomic_get(&content->uct_consume_slot)) % UCT_SLOT_COUNT;
16181622
if (full_slots < UCT_RESUME_THRESHOLD) {
1619-
LOCK(&content->lock);
1623+
LOCK(&content->producer_activation_lock);
16201624
if (content->uct_producer_activation.type != QD_ACTIVATION_NONE) {
16211625
cutthrough_notify_buffers_consumed_outbound(&content->uct_producer_activation);
16221626
}
1623-
UNLOCK(&content->lock);
1627+
UNLOCK(&content->producer_activation_lock);
16241628
}
16251629
}
16261630

@@ -3008,32 +3012,32 @@ int qd_message_consume_buffers(qd_message_t *stream, qd_buffer_list_t *buffers,
30083012
void qd_message_set_consumer_activation(qd_message_t *stream, qd_message_activation_t *activation)
30093013
{
30103014
qd_message_content_t *content = MSG_CONTENT(stream);
3011-
LOCK(&content->lock);
3015+
LOCK(&content->consumer_activation_lock);
30123016
content->uct_consumer_activation = *activation;
3013-
UNLOCK(&content->lock);
3017+
UNLOCK(&content->consumer_activation_lock);
30143018
}
30153019

30163020

30173021
void qd_message_cancel_consumer_activation(qd_message_t *stream)
30183022
{
30193023
qd_message_content_t *content = MSG_CONTENT(stream);
3020-
LOCK(&content->lock);
3024+
LOCK(&content->consumer_activation_lock);
30213025
content->uct_consumer_activation.type = QD_ACTIVATION_NONE;
3022-
UNLOCK(&content->lock);
3026+
UNLOCK(&content->consumer_activation_lock);
30233027
}
30243028

30253029
void qd_message_set_producer_activation(qd_message_t *stream, qd_message_activation_t *activation)
30263030
{
30273031
qd_message_content_t *content = MSG_CONTENT(stream);
3028-
LOCK(&content->lock);
3032+
LOCK(&content->producer_activation_lock);
30293033
content->uct_producer_activation = *activation;
3030-
UNLOCK(&content->lock);
3034+
UNLOCK(&content->producer_activation_lock);
30313035
}
30323036

30333037
void qd_message_cancel_producer_activation(qd_message_t *stream)
30343038
{
30353039
qd_message_content_t *content = MSG_CONTENT(stream);
3036-
LOCK(&content->lock);
3040+
LOCK(&content->producer_activation_lock);
30373041
content->uct_producer_activation.type = QD_ACTIVATION_NONE;
3038-
UNLOCK(&content->lock);
3042+
UNLOCK(&content->producer_activation_lock);
30393043
}

src/message_private.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ typedef struct {
7575

7676
typedef struct {
7777
sys_mutex_t lock;
78+
sys_mutex_t producer_activation_lock; // These locks prevent either side from activating
79+
sys_mutex_t consumer_activation_lock; // the other during tear-down.
80+
// Using these locks, rather than the content lock
81+
// for this purpose, eliminates severe contention
82+
// that was observed on the content lock.
7883
sys_atomic_t ref_count; // The number of messages referencing this
7984
qd_buffer_list_t buffers; // The buffer chain containing the message
8085
qd_buffer_t *pending; // Buffer owned by and filled by qd_message_receive

0 commit comments

Comments
 (0)