Skip to content

Commit

Permalink
huuge interface upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
kelbon committed Aug 10, 2023
1 parent 79f4a43 commit d0c5083
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 179 deletions.
163 changes: 86 additions & 77 deletions include/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,6 @@ struct channel;
template <typename>
struct channel_iterator;

struct nothing_t {
explicit nothing_t() = default;
};
// may be yielded from channel to produce nullptr on caller side without ending an channel.
// for example you can produce 'null terminated' sequences of values with this
// or create an empty but never done channel
constexpr inline nothing_t nothing{};

template <typename Yield>
struct channel_promise : enable_memory_resource_support, not_movable {
static_assert(!std::is_reference_v<Yield>);
Expand All @@ -30,7 +22,7 @@ struct channel_promise : enable_memory_resource_support, not_movable {

// invariant: root != nullptr
channel_promise* root = this;
handle_type current_worker = get_return_object();
handle_type current_worker = self_handle();
// invariant: never nullptr, stores owner for leafs and consumer for top-level channel
union {
channel<Yield>* _consumer; // setted only in root
Expand All @@ -41,14 +33,17 @@ struct channel_promise : enable_memory_resource_support, not_movable {
assert(root != this);
return _owner;
}
std::coroutine_handle<>& consumer() const noexcept {
return root->_consumer->handle;
channel<Yield>* consumer() const noexcept {
return root->_consumer;
}
std::coroutine_handle<>& consumer_handle() const noexcept {
return consumer()->handle;
}
void set_result(Yield* v) const noexcept {
root->_consumer->current_result = v;
consumer()->current_result = v;
}
std::exception_ptr& exception() const noexcept {
return root->_consumer->exception;
return consumer()->exception;
}
void set_exception(std::exception_ptr e) const noexcept {
exception() = e;
Expand All @@ -58,11 +53,15 @@ struct channel_promise : enable_memory_resource_support, not_movable {
constexpr channel_promise() noexcept {
}

handle_type get_return_object() noexcept {
[[gnu::pure]] handle_type self_handle() noexcept {
return handle_type::from_promise(*this);
}
channel<Yield> get_return_object() noexcept {
return channel(self_handle());
}

private:
// TODO reuse in generator
struct hold_value_until_resume {
Yield value;

Expand All @@ -71,70 +70,65 @@ struct channel_promise : enable_memory_resource_support, not_movable {
}
std::coroutine_handle<> await_suspend(handle_type handle) noexcept {
handle.promise().set_result(std::addressof(value));
return handle.promise().consumer();
return handle.promise().consumer_handle();
}
static constexpr void await_resume() noexcept {
}
};

struct attach_leaf {
// precondition: leaf != nullptr
// TODO rename my coroutine handle
coroutine_handle<channel_promise> leaf;
channel<Yield> leaf;

bool await_ready() const noexcept {
return leaf.done();
return leaf.empty();
}

std::coroutine_handle<> await_suspend(handle_type owner) const noexcept {
assert(owner != leaf);
channel_promise& leaf_p = handle_type::from_address(leaf.address()).promise();
channel_promise& owner_p = owner.promise();
leaf_p.current_worker.promise().root = owner_p.root;
assert(owner != leaf.top);
channel_promise& leaf_p = leaf.top.promise();
channel_promise& root_p = *owner.promise().root;
leaf_p.current_worker.promise().root = &root_p;
leaf_p._owner = owner;
owner_p.root->current_worker = leaf_p.current_worker;
root_p.current_worker = leaf_p.current_worker;
return leaf_p.current_worker;
}
constexpr void await_resume() const {
}
~attach_leaf() {
leaf.destroy();
static constexpr void await_resume() noexcept {
}
};

public:
transfer_control_to yield_value(Yield&& rvalue) noexcept {
set_result(std::addressof(rvalue));
return transfer_control_to{consumer()};
return transfer_control_to{consumer_handle()};
}
hold_value_until_resume yield_value(const Yield& clvalue) noexcept(
std::is_nothrow_copy_constructible_v<Yield>) {
return hold_value_until_resume{Yield(clvalue)};
}
transfer_control_to yield_value(nothing_t) noexcept {
transfer_control_to yield_value(terminator_t) noexcept {
set_result(nullptr);
return transfer_control_to{consumer()};
return transfer_control_to{consumer_handle()};
}
transfer_control_to yield_value(by_ref<Yield> r) noexcept {
set_result(std::addressof(r.value));
return transfer_control_to{consumer()};
return transfer_control_to{consumer_handle()};
}
// attaches leaf channel
// postcondition: e.rng.empty()

template <typename X>
attach_leaf yield_value(elements_of<X> e) noexcept {
using rng_t = std::remove_cvref_t<X>;
// TODO for all other ranges too
if constexpr (!std::is_same_v<rng_t, channel<Yield>>)
static_assert(![] {});
if constexpr (std::is_same_v<typename rng_t::value_type, Yield>) {
return attach_leaf{e.rng.release()};
return attach_leaf{std::move(e.rng)};
} else {
auto make_channel = [](auto& r) -> channel<Yield> {
auto next = r.next();
while (auto* x = co_await next)
co_yield Yield(std::move(*x));
};
return attach_leaf{make_channel(e.rng).release()};
return attach_leaf{make_channel(e.rng)};
}
}

Expand All @@ -148,14 +142,26 @@ struct channel_promise : enable_memory_resource_support, not_movable {
return transfer_control_to{owner()};
}
set_result(nullptr);
return transfer_control_to{consumer()};
return transfer_control_to{consumer_handle()};
}
static constexpr void return_void() noexcept {
}
void unhandled_exception() noexcept {
assert(exception() == nullptr);
set_exception(std::current_exception());
root = this; // force final suspend return into consumer
[[gnu::cold]] void unhandled_exception() {
// case when already was exception, its not handled yet and next generated
if (exception() != nullptr) [[unlikely]]
std::terminate();
if (root == this) {
set_result(nullptr);
throw; // after it top is .done() and iteration is over
}
(void)final_suspend(); // up owner(we are done)
// consumer sees nullptr and stop iterating,
// if consumer catches/ignores exception and calls .begin again, he will observe elements from owner,
// effectifelly we will skip failed leaf
set_exception(std::current_exception()); // notify root->consumer about exception
_consumer = root->_consumer; // 'final_suspend' will 'set_result' through root
root = this; // force final suspend return into consumer
// here 'final suspend' sets result to 0 and returns to consumer
}
};

Expand All @@ -166,14 +172,12 @@ struct channel_iterator : not_movable {
channel<Yield>& chan;
friend channel<Yield>;

constexpr channel_iterator(channel<Yield>& c) noexcept : chan(c) {
constexpr explicit channel_iterator(channel<Yield>& c) noexcept : chan(c) {
}

public:
// TODO recieve
// rvalue ref, but never produces const T&&
using reference = std::conditional_t<std::is_const_v<Yield>, Yield&, Yield&&>;
// TODO same for genenrator? там вообще проблемы, немного различается поведение, хотелось бы исправить

// return true if they are attached to same 'channel' object
constexpr bool equivalent(const channel_iterator& other) const noexcept {
return std::addressof(chan) == std::addressof(other.chan);
Expand All @@ -188,7 +192,7 @@ struct channel_iterator : not_movable {
return static_cast<reference>(*chan.current_result);
}
// * after invoking references to value from operator* are invalidated
KELCORO_CO_AWAIT_REQUIRED transfer_control_to operator++() {
KELCORO_CO_AWAIT_REQUIRED transfer_control_to operator++() noexcept {
assert(*this != std::default_sentinel);
return transfer_control_to{chan.top.promise().current_worker};
}
Expand All @@ -197,8 +201,8 @@ struct channel_iterator : not_movable {
// lifetime of this iterator ends and it checks for exception.
// its for performance(no 'if' on each ++, only one for channel)
~channel_iterator() noexcept(false) {
if (chan.exception)
std::rethrow_exception(chan.exception);
if (chan.exception) [[unlikely]]
std::rethrow_exception(chan.take_exception());
}
};

Expand All @@ -210,7 +214,7 @@ struct channel_iterator : not_movable {
template <typename Yield>
struct channel {
using promise_type = channel_promise<Yield>;
using handle_type = coroutine_handle<promise_type>;
using handle_type = std::coroutine_handle<promise_type>;
using value_type = Yield;

private:
Expand All @@ -220,22 +224,29 @@ struct channel {
// Its important for exception handling and better == end(not call .done())
// initialized when first value created(on in final suspend)
Yield* current_result = nullptr;
std::coroutine_handle<> handle = nullptr; // coro in which i exist(setted in co_await on .begin)
handle_type top = always_done_coroutine(); // current top level channel
std::coroutine_handle<> handle = nullptr; // coro in which i exist(setted in co_await on .begin)
coroutine_handle<promise_type> top = always_done_coroutine(); // current top level channel
// invariant: setted only once for one coroutine frame
// if setted, then top may be not done yet
std::exception_ptr exception = nullptr;

// precondition: 'handle' != nullptr, handle does not have other owners
// used from promise::get_return_object
constexpr explicit channel(handle_type top) noexcept : top(top) {
}

public:
// postcondition: empty()
constexpr channel() noexcept = default;
// precondition: 'handle' != nullptr, handle does not have other owners
constexpr channel(std::coroutine_handle<promise_type> top) noexcept : top(top) {
}
// TODO check if nothing yielded ? or just check .done?

constexpr channel(channel&& other) noexcept {
swap(other);
}
constexpr channel& operator=(channel&& other) noexcept {
swap(other);
return *this;
}

void swap(channel& other) noexcept {
std::swap(current_result, other.current_result);
std::swap(handle, other.handle);
Expand All @@ -245,19 +256,22 @@ struct channel {
friend void swap(channel& a, channel& b) noexcept {
a.swap(b);
}
constexpr channel& operator=(channel&& other) noexcept {
swap(other);
return *this;
}

constexpr void reset(handle_type handle) noexcept {
clear();
if (handle)
top = handle;
}
// postcondition: .empty()
// after this method its caller responsibility to correctly destroy 'handle'
[[nodiscard]] constexpr handle_type release() noexcept {
return std::exchange(top, always_done_coroutine());
if (empty())
return nullptr;
return std::exchange(top, always_done_coroutine()).get();
}
// postcondition: .empty()
constexpr void clear() noexcept {
release().destroy();
std::exchange(top, always_done_coroutine()).destroy();
}
~channel() {
clear();
Expand All @@ -272,18 +286,18 @@ struct channel {
return !empty();
}

// returns exception which happens while iterating (or 0)
// postcondition: exception marked as handled
[[nodiscard]] std::exception_ptr take_exception() noexcept {
return std::exchange(exception, nullptr);
}

private:
// TODO unmovable iterator
// TODO старт просто должен резумить первый раз также как в генераторе
struct starter : not_movable {
private:
channel& self;

friend channel;
constexpr starter(channel& c) noexcept : self(c) {
constexpr explicit starter(channel& c) noexcept : self(c) {
}

public:
bool await_ready() const noexcept {
return self.empty();
}
Expand All @@ -309,17 +323,12 @@ struct channel {
}
};

#define KELCORO_DO_UNIQUE_NAME_EXPAND(x, y) x##y
#define KELCORO_DO_UNIQUE_NAME(x, y) KELCORO_DO_UNIQUE_NAME_EXPAND(x, y)
#define KELCORO_UNIQUE_NAME(base) KELCORO_DO_UNIQUE_NAME(base, __LINE__)

// usage example:
// co_foreach(std::string s, mychannel) use(s);
// OR
// co_foreach(YieldType&& x, mychannel) { ..use(std::move(x)).. };
#define co_foreach(VALUE, ... /*CHANNEL, may be expression produces channel*/) \
auto&& KELCORO_UNIQUE_NAME(dd_channel_) = __VA_ARGS__; \
for (auto dd_b_ = co_await KELCORO_UNIQUE_NAME(dd_channel_).begin(); dd_b_ != ::std::default_sentinel; \
co_await ++dd_b_) \
if (VALUE = *dd_b_; true)
#define co_foreach(VARDECL, ... /*CHANNEL, may be expression produces channel*/) \
if (auto&& dd_channel_ = __VA_ARGS__; true) \
for (auto dd_b_ = co_await dd_channel_.begin(); dd_b_ != ::std::default_sentinel; co_await ++dd_b_) \
if (VARDECL = *dd_b_; true)
} // namespace dd
18 changes: 14 additions & 4 deletions include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <cassert>
#include <thread>
#include <memory_resource>

// TODO перенести всякую муть в utility связанную с тредами, опшнлами и тд
#define KELCORO_CO_AWAIT_REQUIRED [[nodiscard("forget co_await?")]]
#ifdef __clang__
#define KELCORO_LIFETIMEBOUND [[clang::lifetimebound]]
Expand Down Expand Up @@ -389,7 +389,12 @@ struct by_ref {
template <typename Yield>
by_ref(Yield&) -> by_ref<Yield>;

// TODO name handle union
// never nullptr, stores always_done_coroutine_handle or std::coroutine_handle<Promise>
// TODO attributes like pure/const/flatten/cold/hot etc
// реально, если забить на мсвц, то можно писать аттрибуты просто через gnu::x и их поймёт кланг тоже...
// TODO убрать макросы тогда нахрен
// TODO проверить комбинацию cold + flatten на unhandled_exception!
template <typename Promise>
struct coroutine_handle {
private:
Expand All @@ -411,12 +416,10 @@ struct coroutine_handle {

std::coroutine_handle<Promise> get() const noexcept {
assert(_h != nullptr);
assert(!_h.done());
return std::coroutine_handle<Promise>::from_address(_h.address());
}
// precondition: not always_done_coroutine stored
Promise& promise() const noexcept {
assert(_h != always_done_coroutine()); // TODO rm
return std::coroutine_handle<Promise>::from_address(_h.address()).promise();
}
static coroutine_handle from_promise(Promise& p) {
Expand Down Expand Up @@ -446,7 +449,6 @@ struct coroutine_handle {
// can be safely used more then 1 time, noop if no coro attached
void destroy() {
_h.destroy();
_h = always_done_coroutine();
}
operator std::coroutine_handle<>() const noexcept {
return _h;
Expand All @@ -459,4 +461,12 @@ struct not_movable {
void operator=(not_movable&&) = delete;
};

struct terminator_t {
explicit terminator_t() = default;
};
// may be yielded from channel to produce nullptr on caller side without ending an channel.
// for example you can produce 'null terminated' sequences of values with this
// or create an empty but never done channel
constexpr inline terminator_t terminator{};

} // namespace dd
Loading

0 comments on commit d0c5083

Please sign in to comment.