Skip to content

Commit

Permalink
feat: Implement thread-safe GrpcContext.run/poll()
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Jul 5, 2024
1 parent e26a39b commit 084c063
Show file tree
Hide file tree
Showing 22 changed files with 364 additions and 334 deletions.
52 changes: 15 additions & 37 deletions src/agrpc/detail/allocate_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,53 +35,31 @@ template <class T>
inline constexpr bool IS_STD_ALLOCATOR<std::allocator<T>> = true;

template <template <class> class OperationTemplate, class Handler, class... Args>
auto allocate_custom_operation(Handler&& handler, Args&&... args)
{
const auto allocator = detail::get_allocator(handler);
auto operation = detail::allocate<OperationTemplate<detail::RemoveCrefT<Handler>>>(
allocator, detail::AllocationType::CUSTOM, static_cast<Handler&&>(handler), static_cast<Args&&>(args)...);
return operation.release();
}

template <template <class> class OperationTemplate, class Handler, class... Args>
auto allocate_local_operation(agrpc::GrpcContext& grpc_context, Handler&& handler, Args&&... args)
auto allocate_operation(Handler&& handler, Args&&... args)
{
using DecayedHandler = detail::RemoveCrefT<Handler>;
auto allocator = detail::get_allocator(handler);
if constexpr (detail::IS_STD_ALLOCATOR<decltype(allocator)>)
using Op = OperationTemplate<DecayedHandler>;
using Allocator = detail::AssociatedAllocatorT<DecayedHandler>;
if constexpr (detail::IS_STD_ALLOCATOR<Allocator>)
{
auto operation = detail::allocate<OperationTemplate<DecayedHandler>>(
grpc_context.get_allocator(), detail::AllocationType::LOCAL, static_cast<Handler&&>(handler),
static_cast<Args&&>(args)...);
if (GrpcContextImplementation::running_in_this_thread())
{
auto operation = detail::allocate<Op>(PoolResourceAllocator<Op>{}, detail::AllocationType::LOCAL,
static_cast<Handler&&>(handler), static_cast<Args&&>(args)...);
return operation.release();
}
auto operation = detail::allocate<Op>(std::allocator<Op>{}, detail::AllocationType::CUSTOM,
static_cast<Handler&&>(handler), static_cast<Args&&>(args)...);
return operation.release();
}
else
{
auto operation = detail::allocate<OperationTemplate<DecayedHandler>>(
allocator, detail::AllocationType::CUSTOM, static_cast<Handler&&>(handler), static_cast<Args&&>(args)...);
const auto allocator = detail::get_allocator(handler);
auto operation = detail::allocate<Op>(allocator, detail::AllocationType::CUSTOM,
static_cast<Handler&&>(handler), static_cast<Args&&>(args)...);
return operation.release();
}
}

template <template <class> class OperationTemplate, class Handler, class... Args>
auto allocate_operation(bool, agrpc::GrpcContext&, Handler&& handler, Args&&... args)
{
// if (is_running_in_this_thread)
// {
// return detail::allocate_local_operation<OperationTemplate>(grpc_context, static_cast<Handler&&>(handler),
// static_cast<Args&&>(args)...);
// }
return detail::allocate_custom_operation<OperationTemplate>(static_cast<Handler&&>(handler),
static_cast<Args&&>(args)...);
}

template <template <class> class OperationTemplate, class Handler, class... Args>
auto allocate_operation(agrpc::GrpcContext& grpc_context, Handler&& handler, Args&&... args)
{
return detail::allocate_operation<OperationTemplate>(
detail::GrpcContextImplementation::running_in_this_thread(grpc_context), grpc_context,
static_cast<Handler&&>(handler), static_cast<Args&&>(args)...);
}
}

AGRPC_NAMESPACE_END
Expand Down
5 changes: 0 additions & 5 deletions src/agrpc/detail/atomic_intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ class AtomicIntrusiveQueue
return detail::IntrusiveQueue<Item>::make_reversed(static_cast<Item*>(old_value));
}

[[nodiscard]] bool is_inactive() const noexcept
{
return head_.load(std::memory_order_relaxed) == producer_inactive_value();
}

private:
[[nodiscard]] void* producer_inactive_value() const noexcept
{
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/detail/basic_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class BasicSenderRunningOperation : public detail::BaseForSenderImplementationTy
}

template <AllocationType, class... Args>
void complete(const agrpc::GrpcContext&, Args... args) noexcept
void complete(Args... args) noexcept
{
reset_stop_callback();
exec::set_value(static_cast<Receiver&&>(receiver()), static_cast<Args&&>(args)...);
Expand Down
4 changes: 1 addition & 3 deletions src/agrpc/detail/create_and_submit_no_arg_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@ void create_and_submit_no_arg_operation(agrpc::GrpcContext& grpc_context, Handle
}
}
detail::StartWorkAndGuard guard{grpc_context};
auto operation = detail::allocate_operation<detail::NoArgOperation>(static_cast<Handler&&>(handler));
if (is_running_in_this_thread)
{
auto operation =
detail::allocate_local_operation<detail::NoArgOperation>(grpc_context, static_cast<Handler&&>(handler));
detail::GrpcContextImplementation::add_local_operation(operation);
}
else
{
auto operation = detail::allocate_custom_operation<detail::NoArgOperation>(static_cast<Handler&&>(handler));
detail::GrpcContextImplementation::add_remote_operation(grpc_context, operation);
}
guard.release();
Expand Down
32 changes: 30 additions & 2 deletions src/agrpc/detail/grpc_context.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ struct AlwaysFalsePredicate
inline void drain_completion_queue(agrpc::GrpcContext& grpc_context)
{
detail::GrpcContextThreadContext thread_context{grpc_context};
detail::ThreadLocalGrpcContextGuard guard{thread_context};
while (detail::GrpcContextImplementation::do_one(thread_context, detail::GrpcContextImplementation::INFINITE_FUTURE,
detail::InvokeHandler::NO_, detail::AlwaysFalsePredicate{}))
{
Expand All @@ -53,17 +52,45 @@ inline grpc::CompletionQueue* get_completion_queue(agrpc::GrpcContext& grpc_cont
{
return grpc_context.get_completion_queue();
}

template <class T>
inline void create_resources(T& resources, std::size_t thread_count_hint = 1)
{
for (size_t i{}; i != thread_count_hint; ++i)
{
auto resource = new detail::StackablePoolResource();
resources.push_front(*resource);
}
}

template <class T>
inline void delete_resources(T& resources)
{
while (!resources.empty())
{
delete &resources.pop_front();
}
}
} // namespace detail

inline GrpcContext::GrpcContext() { detail::create_resources(resources_); }

inline GrpcContext::GrpcContext(std::size_t thread_count_hint) : multithreaded_{thread_count_hint > 1}
{
detail::create_resources(resources_, thread_count_hint);
}

template <class>
inline GrpcContext::GrpcContext(std::unique_ptr<grpc::CompletionQueue>&& completion_queue)
: completion_queue_(static_cast<std::unique_ptr<grpc::CompletionQueue>&&>(completion_queue))
{
detail::create_resources(resources_);
}

inline GrpcContext::GrpcContext(std::unique_ptr<grpc::ServerCompletionQueue> completion_queue)
: completion_queue_(static_cast<std::unique_ptr<grpc::ServerCompletionQueue>&&>(completion_queue))
{
detail::create_resources(resources_);
}

inline GrpcContext::~GrpcContext()
Expand All @@ -76,6 +103,7 @@ inline GrpcContext::~GrpcContext()
asio::execution_context::shutdown();
asio::execution_context::destroy();
#endif
detail::delete_resources(resources_);
}

inline bool GrpcContext::run()
Expand Down Expand Up @@ -161,7 +189,7 @@ inline GrpcContext::executor_type GrpcContext::get_executor() noexcept { return

inline GrpcContext::executor_type GrpcContext::get_scheduler() noexcept { return GrpcContext::executor_type{*this}; }

inline GrpcContext::allocator_type GrpcContext::get_allocator() noexcept { return allocator_type{&local_resource_}; }
inline GrpcContext::allocator_type GrpcContext::get_allocator() noexcept { return allocator_type{}; }

inline void GrpcContext::work_started() noexcept { outstanding_work_.fetch_add(1, std::memory_order_relaxed); }

Expand Down
47 changes: 23 additions & 24 deletions src/agrpc/detail/grpc_context_implementation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <agrpc/detail/grpc_completion_queue_event.hpp>
#include <agrpc/detail/intrusive_queue.hpp>
#include <agrpc/detail/operation_base.hpp>
#include <agrpc/detail/pool_resource.hpp>
#include <agrpc/detail/stackable_pool_resource.hpp>
#include <agrpc/detail/utility.hpp>
#include <grpcpp/completion_queue.h>

Expand Down Expand Up @@ -58,40 +60,31 @@ struct StartWorkAndGuard : detail::WorkFinishedOnExit
StartWorkAndGuard& operator=(StartWorkAndGuard&&) = delete;
};

struct GrpcLocalContext
struct GrpcContextThreadContext
#if defined(AGRPC_STANDALONE_ASIO) || defined(AGRPC_BOOST_ASIO)
// Enables Boost.Asio's awaitable frame memory recycling
: asio::detail::thread_context
#endif
{
explicit GrpcLocalContext(agrpc::GrpcContext& grpc_context);
explicit GrpcContextThreadContext(agrpc::GrpcContext& grpc_context);

~GrpcContextThreadContext() noexcept;

GrpcContextThreadContext(const GrpcContextThreadContext& other) = delete;
GrpcContextThreadContext(GrpcContextThreadContext&& other) = delete;
GrpcContextThreadContext& operator=(const GrpcContextThreadContext& other) = delete;
GrpcContextThreadContext& operator=(GrpcContextThreadContext&& other) = delete;

agrpc::GrpcContext& grpc_context_;
detail::IntrusiveQueue<detail::QueueableOperationBase> local_work_queue_;
bool check_remote_work_;
};
GrpcContextThreadContext* old_context_;
detail::StackablePoolResource& resource_;

#if defined(AGRPC_STANDALONE_ASIO) || defined(AGRPC_BOOST_ASIO)
// Enables Boost.Asio's awaitable frame memory recycling
struct GrpcContextThreadContext : GrpcLocalContext, asio::detail::thread_context
{
using GrpcLocalContext::GrpcLocalContext;

asio::detail::thread_info_base this_thread_;
thread_call_stack::context ctx_{this, this_thread_};
};
#else
using GrpcContextThreadContext = GrpcLocalContext;
#endif

struct ThreadLocalGrpcContextGuard
{
detail::GrpcContextThreadContext* old_context_;

explicit ThreadLocalGrpcContextGuard(detail::GrpcContextThreadContext& context) noexcept;

~ThreadLocalGrpcContextGuard();

ThreadLocalGrpcContextGuard(const ThreadLocalGrpcContextGuard&) = delete;
ThreadLocalGrpcContextGuard(ThreadLocalGrpcContextGuard&&) = delete;
ThreadLocalGrpcContextGuard& operator=(const ThreadLocalGrpcContextGuard&) = delete;
ThreadLocalGrpcContextGuard& operator=(ThreadLocalGrpcContextGuard&&) = delete;
};

struct IsGrpcContextStoppedPredicate
Expand Down Expand Up @@ -126,6 +119,8 @@ struct GrpcContextImplementation
static bool handle_next_completion_queue_event(detail::GrpcContextThreadContext& context, ::gpr_timespec deadline,
detail::InvokeHandler invoke);

[[nodiscard]] static bool running_in_this_thread() noexcept;

[[nodiscard]] static bool running_in_this_thread(const agrpc::GrpcContext& grpc_context) noexcept;

static bool move_local_queue_to_remote_work(detail::GrpcContextThreadContext& context) noexcept;
Expand All @@ -147,6 +142,10 @@ struct GrpcContextImplementation

template <class LoopFunction>
static bool process_work(agrpc::GrpcContext& grpc_context, LoopFunction loop_function);

static detail::StackablePoolResource& pop_resource(agrpc::GrpcContext& grpc_context);

static void push_resource(agrpc::GrpcContext& grpc_context, detail::StackablePoolResource& resource);
};

void process_grpc_tag(void* tag, detail::OperationResult result, agrpc::GrpcContext& grpc_context);
Expand Down
Loading

0 comments on commit 084c063

Please sign in to comment.