Skip to content

Commit

Permalink
perf: Properly distribute work to other thread in GrpcContext.run()
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Sep 11, 2024
1 parent 07b04ea commit aad5681
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 30 deletions.
3 changes: 1 addition & 2 deletions src/agrpc/detail/atomic_intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <agrpc/detail/intrusive_queue.hpp>

#include <atomic>
#include <utility>

#include <agrpc/detail/config.hpp>

Expand Down Expand Up @@ -102,7 +101,7 @@ class AtomicIntrusiveQueue
{
// Pick some pointer that is not nullptr and that is
// guaranteed to not be the address of a valid item.
const void* head_address = std::addressof(head_);
const void* head_address = &head_;
return const_cast<void*>(head_address);
}

Expand Down
7 changes: 4 additions & 3 deletions src/agrpc/detail/grpc_context_implementation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ struct GrpcContextImplementation

[[nodiscard]] static bool running_in_this_thread(const agrpc::GrpcContext& grpc_context) noexcept;

static bool move_local_queue_to_remote_work(detail::GrpcContextThreadContext& context) noexcept;
[[nodiscard]] static bool move_local_queue_to_remote_work(detail::GrpcContextThreadContext& context) noexcept;

static bool move_remote_work_to_local_queue(detail::GrpcContextThreadContext& context) noexcept;
[[nodiscard]] static bool move_remote_work_to_local_queue(detail::GrpcContextThreadContext& context) noexcept;

static void distribute_local_work_to_other_threads(detail::GrpcContextThreadContext& context) noexcept;
[[nodiscard]] static bool distribute_all_local_work_to_other_threads_but_one(
detail::GrpcContextThreadContext& context) noexcept;

static bool process_local_queue(detail::GrpcContextThreadContext& context, detail::InvokeHandler invoke);

Expand Down
54 changes: 31 additions & 23 deletions src/agrpc/detail/grpc_context_implementation_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,23 @@ inline bool GrpcContextImplementation::move_remote_work_to_local_queue(
return true;
}

inline void GrpcContextImplementation::distribute_local_work_to_other_threads(
inline bool GrpcContextImplementation::distribute_all_local_work_to_other_threads_but_one(
detail::GrpcContextThreadContext& context) noexcept
{
if (GrpcContextImplementation::move_local_queue_to_remote_work(context))
auto& local_work_queue = context.local_work_queue_;
if (!local_work_queue.empty())
{
GrpcContextImplementation::trigger_work_alarm(context.grpc_context_);
const auto first = local_work_queue.pop_front();
bool needs_trigger = !local_work_queue.empty();
if (GrpcContextImplementation::move_local_queue_to_remote_work(context))
{
needs_trigger = false;
GrpcContextImplementation::trigger_work_alarm(context.grpc_context_);
}
local_work_queue.push_back(first);
return needs_trigger;
}
return false;
}

inline bool GrpcContextImplementation::process_local_queue(detail::GrpcContextThreadContext& context,
Expand Down Expand Up @@ -226,33 +236,31 @@ inline DoOneResult GrpcContextImplementation::do_one(detail::GrpcContextThreadCo
detail::InvokeHandler invoke)
{
const agrpc::GrpcContext& grpc_context = context.grpc_context_;
const bool multithreaded = context.multithreaded_;
auto& local_work_queue = context.local_work_queue_;
bool check_remote_work = context.check_remote_work_;
if (check_remote_work)
{
if (context.multithreaded_)
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
context.check_remote_work_ = check_remote_work;
}
const auto distribute = [&]
{
if (multithreaded)
{
auto local_queue{std::move(context.local_work_queue_)};
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
if (check_remote_work)
{
auto new_local_queue = detail::pop_front_each(local_queue, context.local_work_queue_);
context.local_work_queue_.append(std::move(local_queue));
GrpcContextImplementation::distribute_local_work_to_other_threads(context);
context.local_work_queue_ = std::move(new_local_queue);
}
else
if (GrpcContextImplementation::distribute_all_local_work_to_other_threads_but_one(context) &&
check_remote_work)
{
context.local_work_queue_ = std::move(local_queue);
GrpcContextImplementation::trigger_work_alarm(context.grpc_context_);
check_remote_work = false;
context.check_remote_work_ = check_remote_work;
}
}
else
{
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
}
context.check_remote_work_ = check_remote_work;
}
};
distribute();
const bool processed_local_work = GrpcContextImplementation::process_local_queue(context, invoke);
const bool is_more_completed_work_pending = check_remote_work || !context.local_work_queue_.empty();
distribute();
const bool is_more_completed_work_pending = check_remote_work || !local_work_queue.empty();
if (!is_more_completed_work_pending && grpc_context.is_stopped())
{
return {{DoOneResult::PROCESSED_LOCAL_WORK}};
Expand Down Expand Up @@ -321,7 +329,7 @@ inline void GrpcContextImplementation::drain_completion_queue(agrpc::GrpcContext
{
detail::GrpcContextThreadContext thread_context{grpc_context, false};
(void)grpc_context.remote_work_queue_.try_mark_active();
GrpcContextImplementation::move_remote_work_to_local_queue(thread_context);
(void)GrpcContextImplementation::move_remote_work_to_local_queue(thread_context);
GrpcContextImplementation::process_local_queue(thread_context, detail::InvokeHandler::NO_);
while (GrpcContextImplementation::handle_next_completion_queue_event(
thread_context, detail::GrpcContextImplementation::INFINITE_FUTURE, detail::InvokeHandler::NO_)
Expand Down
39 changes: 37 additions & 2 deletions test/src/test_grpc_context_17.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,6 @@ void recursively_post(agrpc::GrpcContext& grpc_context)

TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.run() is not blocked by repeated asio::posts")
{
SUBCASE("single-threaded") {}
SUBCASE("multi-threaded") { grpc_context_lifetime.emplace(2); }
bool alarm_completed{false};
recursively_post(grpc_context);
agrpc::Alarm alarm{grpc_context};
Expand All @@ -748,6 +746,43 @@ TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.run() is not blocked by re
grpc_context.run();
}

TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.run() distributes local work to other threads")
{
bool nested_poll{};
SUBCASE("no nested poll") {}
SUBCASE("nested poll") { nested_poll = true; }
grpc_context_lifetime.emplace(2);
std::atomic<uint32_t> are_both_threads_used{false};
std::thread::id t_id;
auto check = [&]
{
are_both_threads_used |= t_id != std::this_thread::get_id();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
};
post(
[&]
{
t_id = std::this_thread::get_id();
post(check);
post(check);
if (nested_poll)
{
grpc_context.poll();
}
});
std::thread t0{[&]
{
grpc_context.run();
}};
std::thread t1{[&]
{
grpc_context.run();
}};
t0.join();
t1.join();
CHECK_NE(0, are_both_threads_used.load());
}

TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.run() interrupted by an exception will not forget local work")
{
grpc_context_lifetime.emplace(2);
Expand Down

0 comments on commit aad5681

Please sign in to comment.