From 5859e7a4f9d13cd587e288b4a2f53a39d0734219 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B8=A3=E8=88=9F?= Date: Mon, 17 Jun 2024 07:18:03 +0000 Subject: [PATCH] fix batch mpmc queue --- common/lockfree_queue.h | 88 ++++++++++++++++------------------- common/test/test_lockfree.cpp | 26 ++++++++--- 2 files changed, 59 insertions(+), 55 deletions(-) diff --git a/common/lockfree_queue.h b/common/lockfree_queue.h index c7cbd719..7fda6da9 100644 --- a/common/lockfree_queue.h +++ b/common/lockfree_queue.h @@ -286,65 +286,59 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase { using Base::empty; using Base::full; - size_t push_batch(const T* x, size_t n) { + size_t push_batch(const T *x, size_t n) { size_t rh, wt; wt = tail.load(std::memory_order_relaxed); for (;;) { - rh = head.load(std::memory_order_relaxed); - auto rn = std::min(n, Base::capacity - (wt - rh)); - if (rn == 0) return 0; - if (tail.compare_exchange_strong(wt, wt + rn, - std::memory_order_acq_rel)) { - auto first_idx = idx(wt); - auto part_length = Base::capacity - first_idx; - if (likely(part_length >= rn)) { - memcpy(&slots[first_idx], x, sizeof(T) * rn); - } else { - if (likely(part_length)) - memcpy(&slots[first_idx], x, sizeof(T) * (part_length)); - memcpy(&slots[0], x + part_length, - sizeof(T) * (rn - part_length)); - } - auto wh = wt; - while (!write_head.compare_exchange_weak( - wh, wt + rn, std::memory_order_acq_rel)) { - ThreadPause::pause(); - wh = wt; - } - return rn; + rh = head.load(std::memory_order_acquire); + auto wn = std::min(n, Base::capacity - (wt - rh)); + if (wn == 0) + return 0; + if (!tail.compare_exchange_strong(wt, wt + wn, std::memory_order_acq_rel)) + continue; + auto first_idx = idx(wt); + auto part_length = Base::capacity - first_idx; + if (likely(part_length >= wn)) { + memcpy(&slots[first_idx], x, sizeof(T) * wn); + } else { + if (likely(part_length)) + memcpy(&slots[first_idx], x, sizeof(T) * (part_length)); + memcpy(&slots[0], x + part_length, sizeof(T) * (wn - part_length)); } + auto wh = wt; + while (!write_head.compare_exchange_strong(wh, wt + wn, std::memory_order_acq_rel)) + wh = wt; + return wn; } } - bool push(const T& x) { return push_batch(&x, 1) == 1; } + bool push(const T &x) { + return push_batch(&x, 1) == 1; + } - size_t pop_batch(T* x, size_t n) { + size_t pop_batch(T *x, size_t n) { size_t rt, wh; rt = read_tail.load(std::memory_order_relaxed); for (;;) { - wh = write_head.load(std::memory_order_relaxed); + wh = write_head.load(std::memory_order_acquire); auto rn = std::min(n, wh - rt); - if (rn == 0) return 0; - if (read_tail.compare_exchange_strong(rt, rt + rn, - std::memory_order_acq_rel)) { - auto first_idx = idx(rt); - auto part_length = Base::capacity - first_idx; - if (likely(part_length >= rn)) { - memcpy(x, &slots[first_idx], sizeof(T) * rn); - } else { - if (likely(part_length)) - memcpy(x, &slots[first_idx], sizeof(T) * (part_length)); - memcpy(x + part_length, &slots[0], - sizeof(T) * (rn - part_length)); - } - auto rh = rt; - while (!head.compare_exchange_weak(rh, rt + rn, - std::memory_order_acq_rel)) { - ThreadPause::pause(); - rh = rt; - } - return rn; + if (rn == 0) + return 0; + if (!read_tail.compare_exchange_strong(rt, rt + rn, std::memory_order_acq_rel)) + continue; + auto first_idx = idx(rt); + auto part_length = Base::capacity - first_idx; + if (likely(part_length >= rn)) { + memcpy(x, &slots[first_idx], sizeof(T) * rn); + } else { + if (likely(part_length)) + memcpy(x, &slots[first_idx], sizeof(T) * (part_length)); + memcpy(x + part_length, &slots[0], sizeof(T) * (rn - part_length)); } + auto rh = rt; + while (!head.compare_exchange_strong(rh, rt + rn, std::memory_order_acq_rel)) + rh = rt; + return rn; } } @@ -444,7 +438,6 @@ class LockfreeSPSCRingQueue : public LockfreeRingQueueBase { n, Base::capacity - (t - head.load(std::memory_order_acquire))); if (n == 0) return 0; auto first_idx = idx(t); - auto last_idx = idx(t + n - 1); auto part_length = Base::capacity - first_idx; if (likely(part_length >= n)) { memcpy(&slots[first_idx], x, sizeof(T) * n); @@ -462,7 +455,6 @@ class LockfreeSPSCRingQueue : public LockfreeRingQueueBase { n = std::min(n, tail.load(std::memory_order_acquire) - h); if (n == 0) return 0; auto first_idx = idx(h); - auto last_idx = idx(h + n - 1); (void)last_idx; auto part_length = Base::capacity - first_idx; if (likely(part_length >= n)) { memcpy(x, &slots[first_idx], sizeof(T) * n); diff --git a/common/test/test_lockfree.cpp b/common/test/test_lockfree.cpp index 1227503b..7cc0f7f0 100644 --- a/common/test/test_lockfree.cpp +++ b/common/test/test_lockfree.cpp @@ -86,6 +86,7 @@ int test_queue(const char *name, QType &queue) { auto begin = std::chrono::steady_clock::now(); for (size_t i = 0; i < receiver_num; i++) { receivers.emplace_back([i, &queue] { + pthread_setname_np(pthread_self(), "Receiver"); photon::set_cpu_affinity(i); std::chrono::nanoseconds rspent(std::chrono::nanoseconds(0)); for (size_t x = 0; x < items_num / receiver_num; x++) { @@ -109,6 +110,7 @@ int test_queue(const char *name, QType &queue) { } for (size_t i = 0; i < sender_num; i++) { senders.emplace_back([i, &queue] { + pthread_setname_np(pthread_self(), "Sender"); photon::set_cpu_affinity(i); std::chrono::nanoseconds wspent{std::chrono::nanoseconds(0)}; for (size_t x = 0; x < items_num / sender_num; x++) { @@ -153,6 +155,7 @@ int test_queue_batch(const char *name, QType &queue) { auto begin = std::chrono::steady_clock::now(); for (size_t i = 0; i < receiver_num; i++) { receivers.emplace_back([i, &queue] { + pthread_setname_np(pthread_self(), "Receiver"); photon::set_cpu_affinity(i); int buffer[32]; size_t size; @@ -181,21 +184,30 @@ int test_queue_batch(const char *name, QType &queue) { } for (size_t i = 0; i < sender_num; i++) { senders.emplace_back([i, &queue] { + pthread_setname_np(pthread_self(), "Sender"); photon::set_cpu_affinity(i); - std::chrono::nanoseconds wspent{std::chrono::nanoseconds(0)}; + std::vector vec; + vec.resize(items_num / sender_num); for (size_t x = 0; x < items_num / sender_num; x++) { + vec[x] = x; + } + size_t size; + std::chrono::nanoseconds wspent{std::chrono::nanoseconds(0)}; + for (size_t x = 0; x < items_num / sender_num;) { auto tm = std::chrono::high_resolution_clock::now(); LSType::lock(wlock); - while (!queue.push(x)) { + while (!(size = queue.push_batch(&vec[x], std::min(32UL, vec.size() - x)))) { LSType::unlock(wlock); CPUPause::pause(); LSType::lock(wlock); } LSType::unlock(wlock); - wspent += std::chrono::high_resolution_clock::now() - tm; - sc[x]++; - scnt[i]++; - // ThreadPause::pause(); + wspent += (std::chrono::high_resolution_clock::now() - tm) / size; + for (auto y = x; y < x + size; y++) { + sc[y] ++; + scnt[i] ++; + } + x += size; } LOG_DEBUG("` sender done, ` ns per action", i, wspent.count() / (items_num / sender_num)); @@ -204,7 +216,7 @@ int test_queue_batch(const char *name, QType &queue) { for (auto &x : senders) x.join(); for (auto &x : receivers) x.join(); auto end = std::chrono::steady_clock::now(); - LOG_DEBUG("` ` p ` c, ` items, Spent ` us\n", name, sender_num, receiver_num, items_num, + LOG_DEBUG("` ` p ` c, ` items, Spent ` us", name, sender_num, receiver_num, items_num, std::chrono::duration_cast(end - begin).count()); for (size_t i = 0; i < items_num / sender_num; i++) { if (sc[i] != rc[i] || sc[i] != sender_num) {