Skip to content

Commit

Permalink
task exception support (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelbon authored Aug 7, 2024
1 parent 1d8cacf commit 9d26aa9
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 35 deletions.
17 changes: 12 additions & 5 deletions include/kelcoro/async_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct async_task;

template <typename Result>
struct async_task_promise : return_block<Result> {
std::exception_ptr exception = nullptr;
std::atomic_bool ready = false;
// only owner and coroutine itself are owners
std::atomic_int8_t ref_count = 1;
Expand All @@ -22,8 +23,9 @@ struct async_task_promise : return_block<Result> {
async_task<Result> get_return_object() {
return async_task<Result>(std::coroutine_handle<async_task_promise<Result>>::from_promise(*this));
}
[[noreturn]] void unhandled_exception() const noexcept {
std::terminate();
void unhandled_exception() noexcept {
exception = std::current_exception();
// goes to final suspend
}

private:
Expand Down Expand Up @@ -102,17 +104,22 @@ struct async_task : enable_resource_deduction {

// precondition: !empty()
// must be invoked in one thread(one consumer)
std::add_rvalue_reference_t<Result> get() && noexcept KELCORO_LIFETIMEBOUND {
std::add_rvalue_reference_t<Result> get() KELCORO_LIFETIMEBOUND {
assert(!empty());
wait();
// result always exist, its setted or std::terminate called on exception.
return handle.promise().result();
auto& promise = handle.promise();
if (promise.exception) [[unlikely]]
std::rethrow_exception(promise.exception);
return promise.result();
}

// return true if call to 'get' will produce UB
constexpr bool empty() const noexcept {
return handle == nullptr;
}
constexpr explicit operator bool() const noexcept {
return !empty();
}

~async_task() {
detach();
Expand Down
23 changes: 23 additions & 0 deletions include/kelcoro/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ struct channel : enable_resource_deduction {
}
};

struct next_awaiter : not_movable {
channel& self;

constexpr explicit next_awaiter(channel& c) noexcept : self(c) {
}
bool await_ready() const noexcept {
return self.empty();
}
KELCORO_ASSUME_NOONE_SEES std::coroutine_handle<> await_suspend(
std::coroutine_handle<> consumer) noexcept {
self.handle = consumer;
self.top.promise()._consumer = &self;
return self.top.promise().current_worker;
}
[[nodiscard]] std::add_pointer_t<Yield> await_resume() const noexcept {
return self.current_result; // nullptr if empty
}
};

public:
// * if .empty(), then co_await begin() == end()
// produces next value(often first)
Expand All @@ -308,6 +327,10 @@ struct channel : enable_resource_deduction {
static constexpr std::default_sentinel_t end() noexcept {
return std::default_sentinel;
}

KELCORO_CO_AWAIT_REQUIRED next_awaiter next() noexcept {
return next_awaiter(*this);
}
};

template <yieldable Y, memory_resource R>
Expand Down
117 changes: 117 additions & 0 deletions include/kelcoro/noexcept_task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#pragma once

#include "common.hpp"
#include "async_task.hpp"
#include "memory_support.hpp"

/*
same as dd::task, but guaranteed to nothrow (terminate), slightly more effective
*/

namespace dd {

template <typename Result>
struct noexcept_task_promise : return_block<Result> {
std::coroutine_handle<void> who_waits;

static constexpr std::suspend_always initial_suspend() noexcept {
return {};
}
auto get_return_object() {
return std::coroutine_handle<noexcept_task_promise<Result>>::from_promise(*this);
}
[[noreturn]] static void unhandled_exception() noexcept {
std::terminate();
}
auto final_suspend() noexcept {
// who_waits always setted because task not started or co_awaited
return transfer_control_to{who_waits};
}
};

// single value generator that returns a value with a co_return
template <typename Result>
struct noexcept_task : enable_resource_deduction {
using result_type = Result;
using promise_type = noexcept_task_promise<Result>;
using handle_type = std::coroutine_handle<promise_type>;

private:
handle_type handle_;

public:
constexpr noexcept_task() noexcept = default;
constexpr noexcept_task(handle_type handle) noexcept : handle_(handle) {
}

noexcept_task(noexcept_task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {
}
noexcept_task& operator=(noexcept_task&& other) noexcept {
std::swap(handle_, other.handle_);
return *this;
}

~noexcept_task() {
if (handle_)
handle_.destroy();
}
// returns true if task cant be co_awaited
constexpr bool empty() const noexcept {
return handle_ == nullptr || handle_.done();
}
constexpr explicit operator bool() const noexcept {
return !empty();
}
[[nodiscard]] handle_type release() noexcept {
return std::exchange(handle_, nullptr);
}

// blocking
result_type get() noexcept {
assert(!empty());
return [](noexcept_task t) -> async_task<result_type> { co_return co_await t; }(std::move(*this)).get();
}

private:
struct remember_waiter_and_start_task_t {
handle_type task_handle;

static bool await_ready() noexcept {
return false;
}
KELCORO_ASSUME_NOONE_SEES std::coroutine_handle<void> await_suspend(
std::coroutine_handle<void> handle) const noexcept {
task_handle.promise().who_waits = handle;
// symmetric transfer control to task
return task_handle;
}
[[nodiscard]] std::add_rvalue_reference_t<result_type> await_resume() noexcept {
return task_handle.promise().result();
}
};

public:
constexpr auto operator co_await() noexcept {
assert(!empty());
return remember_waiter_and_start_task_t{handle_};
}
};

template <typename Ret, memory_resource R>
using noexcept_task_r = resourced<noexcept_task<Ret>, R>;

namespace pmr {

template <typename Ret>
using noexcept_task = ::dd::noexcept_task_r<Ret, polymorphic_resource>;

}

template <typename R>
struct operation_hash<std::coroutine_handle<noexcept_task_promise<R>>> {
operation_hash_t operator()(std::coroutine_handle<noexcept_task_promise<R>> handle) noexcept {
return operation_hash<std::coroutine_handle<>>()(handle.promise().who_waits);
}
};

} // namespace dd
16 changes: 4 additions & 12 deletions include/kelcoro/noexport/thread_pool_monitoring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct monitoring_t {
std::atomic_size_t strands_count = 0;
// count of pop_all from queue
std::atomic_size_t pop_count = 0;
std::atomic_size_t sleep_count = 0;

monitoring_t() = default;
// copied only when one thread
Expand All @@ -29,8 +28,7 @@ struct monitoring_t {
finished(other.finished.load(relaxed)),
cancelled(other.cancelled.load(relaxed)),
strands_count(other.strands_count.load(relaxed)),
pop_count(other.pop_count.load(relaxed)),
sleep_count(other.sleep_count.load(relaxed)) {
pop_count(other.pop_count.load(relaxed)) {
}
// all calculations approximate
using enum std::memory_order;
Expand All @@ -44,22 +42,16 @@ struct monitoring_t {
// order to never produce value < 0
return pushed - finished;
}
static float sleep_percent(size_t pop_count, size_t sleep_count) noexcept {
assert(pop_count >= sleep_count);
return static_cast<float>(sleep_count) / pop_count;
}

void print(auto&& out) const {
size_t p = pushed, f = finished, sc = strands_count, pc = pop_count, slc = sleep_count,
slp = sleep_percent(pc, slc), avr_tp = average_tasks_popped(pc, f), pending = pending_count(p, f),
cld = cancelled;
size_t p = pushed, f = finished, sc = strands_count, pc = pop_count, avr_tp = average_tasks_popped(pc, f),
pending = pending_count(p, f), cld = cancelled;
// clang-format off
out << "pushed: " << p << '\n';
out << "finished: " << f << '\n';
out << "cancelled: " << cld << '\n';
out << "strands_count: " << sc << '\n';
out << "pop_count: " << pc << '\n';
out << "sleep_count: " << slc << '\n';
out << "sleep%: " << slp << '\n';
out << "average tasks popped: " << avr_tp << '\n';
out << "pending count: " << pending << '\n';
// clang-format on
Expand Down
23 changes: 19 additions & 4 deletions include/kelcoro/task.hpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
#pragma once

#include "common.hpp"
#include "kelcoro/async_task.hpp"
#include "memory_support.hpp"

namespace dd {

template <typename Result>
struct task_promise : return_block<Result> {
std::coroutine_handle<void> who_waits;
std::exception_ptr exception = nullptr;

static constexpr std::suspend_always initial_suspend() noexcept {
return {};
}
auto get_return_object() {
return std::coroutine_handle<task_promise<Result>>::from_promise(*this);
}
[[noreturn]] void unhandled_exception() const noexcept {
std::terminate();
void unhandled_exception() noexcept {
exception = std::current_exception();
}
auto final_suspend() noexcept {
// who_waits always setted because task not started or co_awaited
Expand Down Expand Up @@ -50,13 +53,22 @@ struct task : enable_resource_deduction {
handle_.destroy();
}
// returns true if task cant be co_awaited
bool empty() const noexcept {
constexpr bool empty() const noexcept {
return handle_ == nullptr || handle_.done();
}
constexpr explicit operator bool() const noexcept {
return !empty();
}
[[nodiscard]] handle_type release() noexcept {
return std::exchange(handle_, nullptr);
}

// blocking
result_type get() {
assert(!empty());
return [](task t) -> async_task<result_type> { co_return co_await t; }(std::move(*this)).get();
}

private:
struct remember_waiter_and_start_task_t {
handle_type task_handle;
Expand All @@ -71,7 +83,10 @@ struct task : enable_resource_deduction {
return task_handle;
}
[[nodiscard]] std::add_rvalue_reference_t<result_type> await_resume() {
return task_handle.promise().result();
auto& promise = task_handle.promise();
if (promise.exception) [[unlikely]]
std::rethrow_exception(promise.exception);
return promise.result();
}
};

Expand Down
25 changes: 11 additions & 14 deletions include/kelcoro/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ static auto cancel_tasks(task_node* top) noexcept {
KELCORO_MONITORING(return count);
}

} // namespace dd::noexport

namespace dd {

struct task_queue {
private:
task_node* first = nullptr;
Expand Down Expand Up @@ -66,11 +70,10 @@ struct task_queue {

// blocking
// postcondition: task_node != nullptr
[[nodiscard]] task_node* pop_all_not_empty(KELCORO_MONITORING(bool& sleeped)) {
[[nodiscard]] task_node* pop_all_not_empty() {
task_node* nodes;
{
std::unique_lock l(mtx);
KELCORO_MONITORING(sleeped = !first);
while (!first)
not_empty.wait(l);
nodes = pop_all_nolock();
Expand All @@ -80,10 +83,6 @@ struct task_queue {
}
};

} // namespace dd::noexport

namespace dd {

// schedules execution of 'foo' to executor 'e'
[[maybe_unused]] job schedule_to(auto& e KELCORO_LIFETIMEBOUND, auto foo) {
if (!co_await jump_on(e)) [[unlikely]]
Expand All @@ -101,7 +100,7 @@ template <memory_resource R>
// expensive to create
struct worker {
private:
noexport::task_queue queue;
task_queue queue;
KELCORO_MONITORING(monitoring_t mon);
std::thread thread;

Expand Down Expand Up @@ -171,7 +170,7 @@ struct strand {

// distributes tasks among workers
// co_await jump_on(pool) schedules coroutine to thread pool
// note: when thread pool dies, all pending tasks invoked with errc::cancelled
// note: when thread pool dies, all pending tasks invoked with schedule_errc::cancelled
struct thread_pool {
private:
worker* workers; // invariant: != 0
Expand All @@ -185,7 +184,7 @@ struct thread_pool {
}

explicit thread_pool(size_t thread_count = default_thread_count(),
std::pmr::memory_resource* r = std::pmr::new_delete_resource());
std::pmr::memory_resource* r = std::pmr::get_default_resource());

~thread_pool() {
// if destructor started, then it is undefined behavior to push tasks
Expand Down Expand Up @@ -278,11 +277,9 @@ inline void worker::worker_job(worker* w) noexcept {
assert(w);
task_node* top;
std::coroutine_handle task;
noexport::task_queue* queue = &w->queue;
KELCORO_MONITORING(bool sleeped);
while (true) {
top = queue->pop_all_not_empty(KELCORO_MONITORING(sleeped));
KELCORO_MONITORING(if (sleeped) KELCORO_MONITORING_INC(w->mon.sleep_count));
task_queue* queue = &w->queue;
for (;;) {
top = queue->pop_all_not_empty();
KELCORO_MONITORING_INC(w->mon.pop_count);
assert(top);
do {
Expand Down
Loading

0 comments on commit 9d26aa9

Please sign in to comment.