Skip to content

Commit

Permalink
cothread_pool (#16)
Browse files Browse the repository at this point in the history
* thread pool, worker and strand
* latch for coroutines
  • Loading branch information
kelbon authored Feb 27, 2024
1 parent 71ca53d commit d1d8e08
Show file tree
Hide file tree
Showing 14 changed files with 983 additions and 135 deletions.
65 changes: 34 additions & 31 deletions CMakePresets.json
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
{
"version": 5,
"configurePresets": [
{
"name": "debug_dev",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build_debug",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "1",
"CMAKE_BUILD_TYPE": "Debug",
"KELCORO_ENABLE_TESTING": "ON"
}
},
{
"name": "release_dev",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build_release",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "1",
"CMAKE_BUILD_TYPE": "Release",
"KELCORO_ENABLE_TESTING": "ON"
}
},
{
"name": "default",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "1",
"CMAKE_BUILD_TYPE": "Release"
}
"version": 5,
"configurePresets": [
{
"name": "debug_dev",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build_debug",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "1",
"CMAKE_BUILD_TYPE": "Debug",
"KELCORO_ENABLE_TESTING": "ON",
"CMAKE_CXX_STANDARD": "20"
}
]
},
{
"name": "release_dev",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build_release",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "1",
"CMAKE_BUILD_TYPE": "Release",
"KELCORO_ENABLE_TESTING": "ON",
"CMAKE_CXX_STANDARD": "20"
}
},
{
"name": "default",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build",
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "1",
"CMAKE_BUILD_TYPE": "Release",
"CMAKE_CXX_STANDARD": "20"
}
}
]
}
10 changes: 7 additions & 3 deletions include/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
#endif
namespace dd {

// behavior very similar to generator, but channel may suspend before co_yield

template <yieldable Yield>
struct channel_promise : not_movable {
using handle_type = std::coroutine_handle<channel_promise>;
Expand Down Expand Up @@ -61,7 +59,6 @@ struct channel_promise : not_movable {
public:
constexpr channel_promise() noexcept {
}

channel<Yield> get_return_object() noexcept {
return channel<Yield>(self_handle());
}
Expand Down Expand Up @@ -317,6 +314,13 @@ using channel = ::dd::channel_r<Y, polymorphic_resource>;

}

template <yieldable Y>
struct operation_hash<std::coroutine_handle<channel_promise<Y>>> {
size_t operator()(std::coroutine_handle<channel_promise<Y>> handle) const noexcept {
return std::hash<const void*>()(handle.promise().root);
}
};

// usage example:
// co_foreach(std::string s, mychannel) use(s);
// OR
Expand Down
142 changes: 115 additions & 27 deletions include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
#include <thread>
#include <memory_resource>

#include "operation_hash.hpp"

#define KELCORO_CO_AWAIT_REQUIRED [[nodiscard("forget co_await?")]]

#if defined(__GNUC__) || defined(__clang__)
#define KELCORO_UNREACHABLE __builtin_unreachable()
#elif defined(_MSC_VER)
#define KELCORO_UNREACHABLE __assume(false)
#else
#define KELCORO_UNREACHABLE (void)0
#define KELCORO_UNREACHABLE assert(false)
#endif

#if !defined(NDEBUG)
Expand Down Expand Up @@ -76,6 +78,14 @@

namespace dd {

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
constexpr std::size_t hardware_constructive_interference_size = 64;
constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

struct not_movable {
constexpr not_movable() noexcept = default;
not_movable(not_movable&&) = delete;
Expand Down Expand Up @@ -158,6 +168,8 @@ struct polymorphic_resource {
passed = std::pmr::get_default_resource();
assert(passed != nullptr);
}
polymorphic_resource(std::pmr::memory_resource& m) noexcept : passed(&m) {
}
void* allocate(size_t sz, size_t align) {
return passed->allocate(sz, align);
}
Expand Down Expand Up @@ -201,6 +213,7 @@ struct with_resource {
};
template <typename X>
with_resource(X&&) -> with_resource<std::remove_cvref_t<X>>;
with_resource(std::pmr::memory_resource&) -> with_resource<pmr::polymorphic_resource>;

namespace noexport {

Expand All @@ -211,6 +224,26 @@ struct find_resource_tag<with_resource<R>, Args...> : std::type_identity<R> {};
template <typename Head, typename... Tail>
struct find_resource_tag<Head, Tail...> : find_resource_tag<Tail...> {};

template <typename E>
struct default_jump_on {
using stored_t = std::conditional_t<std::is_empty_v<E>, E, E&>;
KELCORO_NO_UNIQUE_ADDRESS
stored_t e;

static_assert(std::is_same_v<std::decay_t<E>, E>);

constexpr default_jump_on(stored_t e) noexcept : e(static_cast<E&&>(e)) {
}
static constexpr bool await_ready() noexcept {
return false;
}
constexpr void await_suspend(std::coroutine_handle<> handle) const {
e.execute(handle);
}
static constexpr void await_resume() noexcept {
}
};

} // namespace noexport

// searches for 'with_resource' in Types and returns first finded or void if no such
Expand Down Expand Up @@ -342,6 +375,15 @@ struct KELCORO_MSVC_EBO resourced_promise : Promise, overload_new_delete<R> {
// standard wording goes wrong
};

template <typename Promise, memory_resource R>
struct operation_hash<std::coroutine_handle<resourced_promise<Promise, R>>> {
size_t operator()(std::coroutine_handle<resourced_promise<Promise, R>> h) const {
return operation_hash<std::coroutine_handle<Promise>>()(
// assume addresses are same (dirty hack for supporting allocators)
std::coroutine_handle<Promise>::from_address(h.address()));
}
};

// 'teaches' promise to return
template <typename T>
struct return_block {
Expand Down Expand Up @@ -400,49 +442,38 @@ struct [[nodiscard("Dont forget to name it!")]] scope_exit {
}
};

template <typename T>
concept executor = requires(T& value) { value.execute([] {}); };
// ADL customization point, may be overloaded for your executor type, should return awaitable which
// schedules execution of coroutine to 'e'
KELCORO_CO_AWAIT_REQUIRED constexpr co_awaiter auto jump_on(auto& e) noexcept {
return noexport::default_jump_on<std::remove_cvref_t<decltype(e)>>(e);
}

// DEFAULT EXECUTORS

struct noop_executor {
struct noop_executor_t {
template <std::invocable F>
void execute(F&&) const noexcept {
static void execute(F&&) noexcept {
}
};

struct this_thread_executor {
constexpr inline noop_executor_t noop_executor;

struct this_thread_executor_t {
template <std::invocable F>
void execute(F&& f) const noexcept(std::is_nothrow_invocable_v<F&&>) {
static void execute(F&& f) noexcept(std::is_nothrow_invocable_v<F&&>) {
(void)std::forward<F>(f)();
}
};
constexpr inline this_thread_executor_t this_thread_executor;

struct new_thread_executor {
struct new_thread_executor_t {
template <std::invocable F>
void execute(F&& f) const {
static void execute(F&& f) {
std::thread([foo = std::forward<F>(f)]() mutable { (void)std::forward<F>(foo)(); }).detach();
}
};

template <executor E>
struct KELCORO_CO_AWAIT_REQUIRED jump_on {
KELCORO_NO_UNIQUE_ADDRESS E e;
#if __cpp_aggregate_paren_init < 201902L
constexpr jump_on(std::type_identity_t<E> e) noexcept : e(static_cast<E&&>(e)) {
}
#endif
static constexpr bool await_ready() noexcept {
return false;
}
constexpr void await_suspend(std::coroutine_handle<> handle) const {
e.execute(handle);
}
static constexpr void await_resume() noexcept {
}
};
template <typename E>
jump_on(E&&) -> jump_on<E>;
constexpr inline new_thread_executor_t new_thread_executor;

struct KELCORO_CO_AWAIT_REQUIRED get_handle_t {
private:
Expand All @@ -467,11 +498,68 @@ struct KELCORO_CO_AWAIT_REQUIRED get_handle_t {
}
};

// destroys coroutine in which awaited for
struct KELCORO_CO_AWAIT_REQUIRED destroy_coro_t {
static bool await_ready() noexcept {
return false;
}
static void await_suspend(std::coroutine_handle<> handle) noexcept {
handle.destroy();
}
static void await_resume() noexcept {
KELCORO_UNREACHABLE;
}
};

template <typename F>
struct KELCORO_CO_AWAIT_REQUIRED suspend_and_t {
KELCORO_NO_UNIQUE_ADDRESS F fn;

constexpr static bool await_ready() noexcept {
return false;
}
template <typename P>
constexpr auto await_suspend(std::coroutine_handle<P> handle) noexcept {
return fn(handle);
}
constexpr static void await_resume() noexcept {
}
};

template <typename F>
suspend_and_t(F&&) -> suspend_and_t<std::remove_cvref_t<F>>;

struct KELCORO_CO_AWAIT_REQUIRED op_hash_t {
operation_hash_t hash;

static constexpr bool await_ready() noexcept {
return false;
}
template <typename P>
constexpr bool await_suspend(std::coroutine_handle<P> handle) noexcept {
calculate_operation_hash(handle);
return false;
}
constexpr operation_hash_t await_resume() noexcept {
return hash;
}
};

namespace this_coro {

// provides access to inner handle of coroutine
constexpr inline get_handle_t handle = {};

constexpr inline destroy_coro_t destroy = {};

constexpr inline op_hash_t operation_hash = {};

// co_awaiting on this function suspends coroutine and invokes 'fn' with coroutine handle.
// await suspend returns what 'fn' returns!
constexpr auto suspend_and(auto&& fn) {
return suspend_and_t(std::forward<decltype(fn)>(fn));
}

} // namespace this_coro

// imitating compiler behaviour for co_await expression mutation into awaiter(without await_transform)
Expand Down
Loading

0 comments on commit d1d8e08

Please sign in to comment.