Skip to content

Commit

Permalink
usefull (#26)
Browse files Browse the repository at this point in the history
* empty generator, raw handles
* task context
* when_all / when_any on tasks
  • Loading branch information
kelbon authored Sep 25, 2024
1 parent f9fd204 commit 12be8c3
Show file tree
Hide file tree
Showing 14 changed files with 957 additions and 178 deletions.
3 changes: 2 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ AllowShortFunctionsOnASingleLine: false
AllowShortLambdasOnASingleLine: true
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
SortIncludes: false
SortIncludes: false
IndentPPDirectives: BeforeHash
167 changes: 167 additions & 0 deletions include/kelcoro/algorithm.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#pragma once

#include "kelcoro/noexport/expected.hpp"
#include "kelcoro/task.hpp"
#include "kelcoro/job.hpp"

namespace dd {

namespace noexport {

template <typename T, typename OwnerPromise, typename Ctx>
job job_for_when_all(task<T, Ctx>& child, std::coroutine_handle<OwnerPromise> owner,
expected<T, std::exception_ptr>& result, std::atomic<size_t>& count) {
co_await child.wait_with_proxy_owner(owner);

if (child.raw_handle().promise().exception) [[unlikely]]
result.data.template emplace<1>(child.raw_handle().promise().exception);
else {
if constexpr (!std::is_void_v<T>) {
result.data.template emplace<0>(child.raw_handle().promise().result());
} else {
result.data.template emplace<0>();
}
}
size_t i = count.fetch_sub(1, std::memory_order_acq_rel);
if (i == 1) // im last, now 'count' == 0
co_await this_coro::destroy_and_transfer_control_to(owner);
};

template <typename... Ts>
struct when_any_state {
std::mutex mtx;
std::variant<std::monostate, expected<Ts, std::exception_ptr>...> result;
size_t count_waiters = 0;
std::coroutine_handle<> owner = nullptr;

explicit when_any_state(size_t count_waiters, std::coroutine_handle<> owner) noexcept
: count_waiters(count_waiters), owner(owner) {
}

// returns owner if called must resume it
template <size_t I>
[[nodiscard]] std::coroutine_handle<> set_exception(std::exception_ptr p) noexcept {
std::lock_guard l(mtx);
if (!owner) // do not overwrite value, if set_result setted
return nullptr;
result.template emplace<I>(unexpected(std::move(p)));
--count_waiters;
// returns true only if all tasks ended with exception
return count_waiters == 0 ? owner : nullptr;
}
// returns owner if caller must resume it
template <size_t I, typename... Args>
[[nodiscard]] std::coroutine_handle<> set_result(Args&&... args) {
static_assert(I != 0);
std::lock_guard l(mtx);
if (!owner)
return nullptr;
result.template emplace<I>(std::forward<Args>(args)...);
return std::exchange(owner, nullptr);
}
};

template <size_t I, typename T, typename Ctx, typename... Ts>
job job_for_when_any(task<T, Ctx> child, std::weak_ptr<when_any_state<Ts...>> state) {
// stop at entry and give when_any do its preparations
co_await std::suspend_always{};
if (!state.lock()) {
// someone sets result while we was starting without awaiting
co_return;
}
// without proxy owner, because real owner may be destroyed while this task is running
co_await child.wait();
std::shared_ptr state_s = state.lock();
if (!state_s) // no one waits
co_return;
auto& child_promise = child.raw_handle().promise();
std::coroutine_handle<> owner;
if (child_promise.exception) [[unlikely]]
owner = state_s->template set_exception<I>(child_promise.exception);
else {
if constexpr (!std::is_void_v<T>) {
owner = state_s->template set_result<I>(child_promise.result());
} else {
owner = state_s->template set_result<I>();
}
}
if (owner)
co_await this_coro::destroy_and_transfer_control_to(owner);
}

} // namespace noexport

// all tasks contexts will be attached to one owner
// precondition: all tasks not .empty()
template <typename... Ts, typename Ctx>
auto when_all(task<Ts, Ctx>... tasks) -> task<std::tuple<expected<Ts, std::exception_ptr>...>, Ctx> {
assert((tasks && ...));
if constexpr (sizeof...(tasks) == 0)
co_return {};
std::atomic<size_t> count = sizeof...(tasks);
std::tuple<expected<Ts, std::exception_ptr>...> results;

co_await this_coro::suspend_and([&](auto when_all_handle) {
auto starter = [&](auto&... vals) {
(noexport::job_for_when_all(tasks, when_all_handle, vals, count), ...);
};
std::apply(starter, results);
});
co_return results;
}

// precondition: all tasks not .empty()
template <typename T, typename Ctx>
auto when_all(std::vector<task<T, Ctx>> tasks) -> task<std::vector<expected<T, std::exception_ptr>>, Ctx> {
assert(std::ranges::find_if(tasks, [](auto& t) { return t.empty(); }) == tasks.end());
if (tasks.empty())
co_return {};
std::atomic<size_t> count = tasks.size();
std::vector<expected<T, std::exception_ptr>> results(tasks.size());

co_await this_coro::suspend_and([&](auto when_all_handle) {
size_t i = 0;
for (auto& task : tasks)
noexport::job_for_when_all(task, when_all_handle, results[i++], count);
});
co_return results;
}

template <typename T, typename... Ts>
struct first_type {
using type = T;
};
// precondition: all tasks not .empty()
// returns first not failed or last exception if all failed
// result is never monostate (.index() > 0)
template <typename... Ts, typename... Ctx>
auto when_any(task<Ts, Ctx>... tasks)
-> task<std::variant<std::monostate, expected<Ts, std::exception_ptr>...>,
typename first_type<Ctx...>::type> {
assert((tasks && ...));
static_assert(sizeof...(tasks) != 0);

std::shared_ptr state =
std::make_shared<noexport::when_any_state<Ts...>>(sizeof...(tasks), co_await this_coro::handle);

co_await this_coro::suspend_and([&](std::coroutine_handle<>) {
[&]<size_t... Is>(std::index_sequence<Is...>) {
// guard for case when someone destroys 'when_all' while we are starting
// (return result and ends coroutine)
std::weak_ptr guard = state;
// to stack for case when one of them throws/returns without await and destroys 'when_any' task
// + 1 bcs of monostate in results
job jobs[] = {noexport::job_for_when_any<Is + 1>(std::move(tasks), guard)...};
// must not throw
for (job& j : jobs)
j.handle.resume();
}(std::index_sequence_for<Ts...>{});
});

co_return std::move(state->result);
}

// TODO when any dynamic count, fail-fast policy with returning ANY first result?
// TODO different contexts when_all Tuple contextes + attach one to one...

} // namespace dd
4 changes: 4 additions & 0 deletions include/kelcoro/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ struct channel : enable_resource_deduction {
[[nodiscard]] constexpr handle_type release() noexcept {
return std::exchange(top, nullptr);
}
[[nodiscard]] constexpr handle_type raw_handle() const noexcept {
return top;
}

// postcondition: .empty()
constexpr void clear() noexcept {
if (top) {
Expand Down
92 changes: 66 additions & 26 deletions include/kelcoro/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ template <typename T>
struct return_block {
std::optional<T> storage = std::nullopt;

constexpr void return_value(T value) noexcept(std::is_nothrow_move_constructible_v<T>) {
storage.emplace(std::move(value));
template <typename U = T>
constexpr void return_value(U&& value) noexcept(std::is_nothrow_constructible_v<T, U&&>) {
storage.emplace(std::forward<U>(value));
}
constexpr T&& result() noexcept KELCORO_LIFETIMEBOUND {
assert(storage.has_value());
Expand Down Expand Up @@ -78,29 +79,6 @@ struct [[nodiscard("Dont forget to name it!")]] scope_exit {
}
};

struct KELCORO_CO_AWAIT_REQUIRED get_handle_t {
private:
struct return_handle {
std::coroutine_handle<> handle_;

static constexpr bool await_ready() noexcept {
return false;
}
KELCORO_ASSUME_NOONE_SEES bool await_suspend(std::coroutine_handle<> handle) noexcept {
handle_ = handle;
return false;
}
std::coroutine_handle<> await_resume() const noexcept {
return handle_;
}
};

public:
return_handle operator co_await() const noexcept {
return return_handle{};
}
};

// destroys coroutine in which awaited for
struct KELCORO_CO_AWAIT_REQUIRED destroy_coro_t {
static bool await_ready() noexcept {
Expand Down Expand Up @@ -134,6 +112,47 @@ suspend_and_t(F&&) -> suspend_and_t<std::remove_cvref_t<F>>;

namespace this_coro {

struct KELCORO_CO_AWAIT_REQUIRED get_handle_t {
template <typename PromiseType>
struct awaiter {
std::coroutine_handle<PromiseType> handle_;

static constexpr bool await_ready() noexcept {
return false;
}
KELCORO_ASSUME_NOONE_SEES bool await_suspend(std::coroutine_handle<PromiseType> handle) noexcept {
handle_ = handle;
return false;
}
[[nodiscard]] std::coroutine_handle<PromiseType> await_resume() const noexcept {
return handle_;
}
};

awaiter<void> operator co_await() const noexcept {
return awaiter<void>{};
}
};

struct get_context_t {
template <typename Ctx>
struct awaiter {
Ctx* ctx;

static bool await_ready() noexcept {
return false;
}
template <typename T>
bool await_suspend(std::coroutine_handle<T> h) {
ctx = std::addressof(h.promise().ctx);
return false;
}
[[nodiscard]] Ctx& await_resume() const noexcept {
return *ctx;
}
};
};

// provides access to inner handle of coroutine
constexpr inline get_handle_t handle = {};

Expand All @@ -145,6 +164,28 @@ constexpr auto suspend_and(auto&& fn) {
return suspend_and_t(std::forward<decltype(fn)>(fn));
}

struct [[nodiscard("co_await it!")]] destroy_and_transfer_control_to {
std::coroutine_handle<> who_waits;

#if !KELCORO_AGGREGATE_PAREN_INIT
destroy_and_transfer_control_to() = default;
explicit destroy_and_transfer_control_to(std::coroutine_handle<> h) noexcept : who_waits(h) {
}
#endif
static bool await_ready() noexcept {
return false;
}
KELCORO_ASSUME_NOONE_SEES std::coroutine_handle<> await_suspend(std::coroutine_handle<> self) noexcept {
// move it to stack memory to save from destruction
auto w = who_waits;
self.destroy();
return w ? w : std::noop_coroutine(); // symmetric transfer here
}
static void await_resume() noexcept {
KELCORO_UNREACHABLE;
}
};

} // namespace this_coro

template <typename T>
Expand All @@ -171,7 +212,6 @@ template <typename T>
concept co_awaitable = has_member_co_await<T> || has_global_co_await<T> || co_awaiter<T>;

// imitating compiler behaviour for co_await expression mutation into awaiter(without await_transform)
// very usefull if you have await_transform and for all other types you need default behavior
template <co_awaitable T>
[[nodiscard]] constexpr decltype(auto) build_awaiter(T&& value) {
static_assert(!ambigious_co_await_lookup<T>);
Expand Down
Loading

0 comments on commit 12be8c3

Please sign in to comment.