Skip to content

Commit

Permalink
perf: Improve distribution of work to other threads in GrpcContext
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Sep 12, 2024
1 parent aad5681 commit 06627d6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
20 changes: 19 additions & 1 deletion src/agrpc/detail/atomic_intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,25 @@ class AtomicIntrusiveQueue
return old_value == inactive;
}

bool try_mark_inactive() noexcept
// Returns true if the producer is inactive and needs to be
// woken up. The calling thread has responsibility for waking
// up the producer.
[[nodiscard]] bool prepend(IntrusiveQueue<Item> items) noexcept
{
if (items.empty())
{
return false;
}
const void* const inactive = producer_inactive_value();
void* old_value = head_.load(std::memory_order_relaxed);
do
{
items.tail_->next_ = (old_value == inactive) ? nullptr : static_cast<Item*>(old_value);
} while (!head_.compare_exchange_weak(old_value, items.head_, std::memory_order_acq_rel));
return old_value == inactive;
}

[[nodiscard]] bool try_mark_inactive() noexcept
{
void* const inactive = producer_inactive_value();
if (void* old_value = head_.load(std::memory_order_relaxed); old_value == nullptr)
Expand Down
38 changes: 26 additions & 12 deletions src/agrpc/detail/grpc_context_implementation_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,7 @@ inline bool GrpcContextImplementation::move_local_queue_to_remote_work(
detail::GrpcContextThreadContext& context) noexcept
{
agrpc::GrpcContext& grpc_context = context.grpc_context_;
bool is_queue_inactive{};
auto queue{std::move(context.local_work_queue_)};
while (!queue.empty())
{
auto* op = queue.pop_front();
if (grpc_context.remote_work_queue_.enqueue(op))
{
is_queue_inactive = true;
}
}
return is_queue_inactive;
return grpc_context.remote_work_queue_.prepend(std::move(context.local_work_queue_));
}

inline bool GrpcContextImplementation::move_remote_work_to_local_queue(
Expand Down Expand Up @@ -241,7 +231,31 @@ inline DoOneResult GrpcContextImplementation::do_one(detail::GrpcContextThreadCo
bool check_remote_work = context.check_remote_work_;
if (check_remote_work)
{
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
if (multithreaded)
{
if (local_work_queue.empty())
{
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
if (check_remote_work)
{
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
}
if (check_remote_work)
{
GrpcContextImplementation::trigger_work_alarm(context.grpc_context_);
check_remote_work = false;
}
}
else if (local_work_queue.has_exactly_one_element())
{
GrpcContextImplementation::trigger_work_alarm(context.grpc_context_);
check_remote_work = false;
}
}
else
{
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
}
context.check_remote_work_ = check_remote_work;
}
const auto distribute = [&]
Expand Down
6 changes: 6 additions & 0 deletions src/agrpc/detail/intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef AGRPC_DETAIL_INTRUSIVE_QUEUE_HPP
#define AGRPC_DETAIL_INTRUSIVE_QUEUE_HPP

#include <agrpc/detail/forward.hpp>

#include <utility>

#include <agrpc/detail/config.hpp>
Expand Down Expand Up @@ -67,6 +69,8 @@ class IntrusiveQueue

[[nodiscard]] bool empty() const noexcept { return head_ == nullptr; }

[[nodiscard]] bool has_exactly_one_element() const noexcept { return !empty() && head_ == tail_; }

[[nodiscard]] Item* pop_front() noexcept
{
Item* item = std::exchange(head_, head_->next_);
Expand Down Expand Up @@ -110,6 +114,8 @@ class IntrusiveQueue
}

private:
friend detail::AtomicIntrusiveQueue<Item>;

Item* head_{nullptr};
Item* tail_{nullptr};
};
Expand Down

0 comments on commit 06627d6

Please sign in to comment.