Skip to content

Commit

Permalink
abc
Browse files Browse the repository at this point in the history
  • Loading branch information
kelbon committed Aug 6, 2023
1 parent 29f1e21 commit a792cc3
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 88 deletions.
99 changes: 41 additions & 58 deletions include/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@

namespace dd {

// TODO запретить next на rvalue канале? И запретить && на rvalue next!
// тогда юзер всегда если не даун будет писать
// channel c = foo();
// auto next = c.next();
// while (auto* p = co_await next)
// но хотелось бы конечно приятнее как то + обработка исключений. Мб запихнуть под макрос
// logic and behavior is very similar to generator, but its async(may suspend before co_yield)
// ... HMM вообще говоря динамический goto вполне решает проблему... и ифа больше не будет

// TODO для таски и канала добавить возможность запустить их синхронно, но это явно unsafe.. Хотя это
// эквивалентно просто резуму хендла
// TODO создать generator просто одним ифом в channel, это же одно и то же. Различие в обработке исключений,
// запрете co_await и наличии итератора(т.к. существует гарантия, что синхронно)

// TODO exception into consumer?
template <typename>
struct channel;

struct nothing_t {
explicit nothing_t() = default;
};
// may be yielded from channel to produce nullptr on caller side without ending an channel.
// for example you can produce 'null terminated' sequences of values with this
// or create an empty but never done channel
constexpr inline nothing_t nothing{};

template <typename Yield>
struct channel_promise : enable_memory_resource_support {
static_assert(!std::is_reference_v<Yield>);
Expand All @@ -30,22 +30,26 @@ struct channel_promise : enable_memory_resource_support {
friend channel_promise;
friend channel<Yield>;

Yield* current_result;
Yield* current_result = nullptr;
std::coroutine_handle<> handle;
handle_type top;
std::coroutine_handle<> _top; // maybe always done coro or handle_type

constexpr consumer_t(handle_type top) noexcept : current_result(nullptr), handle(nullptr), top(top) {
constexpr consumer_t(std::coroutine_handle<> top) noexcept : _top(top) {
assume_not_null(top);
}
handle_type top() const noexcept {
return handle_type::from_address(_top.address());
}

public:
bool await_ready() const noexcept {
return top.done();
return _top.done();
}
std::coroutine_handle<void> await_suspend(std::coroutine_handle<void> consumer) noexcept {
handle = consumer;
top.promise().consumer = this;
return top.promise().current_worker;
auto& p = top().promise();
p.consumer = this;
return p.current_worker;
}
[[nodiscard]] Yield* await_resume() const noexcept {
return current_result;
Expand All @@ -56,14 +60,6 @@ struct channel_promise : enable_memory_resource_support {
handle_type current_worker = get_return_object();
// nullptr means top-level
handle_type owner = nullptr;
// stores unhandled exception, rethrows it
// TODO прокидывается везде через attach leaf await resume, но в самом конце в потребителе
// я не хочу ифы на каждое значение, тут вариант либо visit в каком то виде...(подключение к консумеру)
// потребителя берущего функцию и применяющую ко всему из канала)
// либо второй вариант - в ~next_t делать проверку и прокидывать исключение, но это конечно проблема
// плюс ещё проблема, что исключение тогда может вылететь абы где, а не тогда когда логически оно вылетело
// вариант №3, макрос async for each делающий ровно это, с проверкой после цикла и бросанием
std::exception_ptr exception = nullptr;

channel_promise() = default;

Expand Down Expand Up @@ -92,7 +88,7 @@ struct channel_promise : enable_memory_resource_support {

struct attach_leaf {
// precondition: leaf != nullptr
handle_type leaf;
std::coroutine_handle<> leaf;

bool await_ready() const noexcept {
assume_not_null(leaf);
Expand All @@ -101,16 +97,14 @@ struct channel_promise : enable_memory_resource_support {

public:
std::coroutine_handle<> await_suspend(handle_type owner) const noexcept {
channel_promise& leaf_p = leaf.promise();
channel_promise& leaf_p = handle_type::from_address(leaf.address()).promise();
consumer_t& consumer = *owner.promise().consumer;
leaf_p.consumer = &consumer;
leaf_p.owner = owner;
consumer.top.promise().current_worker = leaf_p.current_worker;
consumer.top().promise().current_worker = leaf_p.current_worker;
return leaf_p.current_worker;
}
constexpr void await_resume() const {
if (leaf.promise().exception)
std::rethrow_exception(leaf.promise().exception);
}
~attach_leaf() {
// make sure compiler, that its destroyed here (end of yield expression)
Expand All @@ -127,6 +121,14 @@ struct channel_promise : enable_memory_resource_support {
std::is_nothrow_copy_constructible_v<Yield>) {
return hold_value_until_resume{Yield(clvalue)};
}
transfer_control_to yield_value(nothing_t) noexcept {
consumer->current_result = nullptr;
return transfer_control_to{consumer->handle};
}
transfer_control_to yield_value(by_ref<Yield> r) noexcept {
consumer->current_result = std::addressof(r.value);
return transfer_control_to{consumer->handle};
}
// attaches leaf channel
// TODO think about generator-leaf
// postcondition: e.rng.empty()
Expand All @@ -136,11 +138,7 @@ struct channel_promise : enable_memory_resource_support {
if constexpr (!std::is_same_v<rng_t, channel<Yield>>)
static_assert(![] {});
if constexpr (std::is_same_v<typename rng_t::value_type, Yield>) {
handle_type h = e.rng.release();
if (!h)
h = []() -> channel<Yield> { co_return; }().release();
assume_not_null(h);
return attach_leaf{h};
return attach_leaf{e.rng.release()};
} else {
auto make_channel = [](auto& r) -> channel<Yield> {
auto next = r.next();
Expand All @@ -149,14 +147,13 @@ struct channel_promise : enable_memory_resource_support {
};
return attach_leaf{make_channel(e.rng).release()};
}
// TODO else if generator/just range
}

static constexpr std::suspend_always initial_suspend() noexcept {
return {};
}
transfer_control_to final_suspend() const noexcept {
consumer->top.promise().current_worker = owner;
consumer->top().promise().current_worker = owner;
consumer->current_result = nullptr; // may be im last
// i dont have value here now, so i ask 'owner' to create it
if (owner) {
Expand All @@ -167,28 +164,24 @@ struct channel_promise : enable_memory_resource_support {
}
static constexpr void return_void() noexcept {
}
void unhandled_exception() noexcept {
exception = std::current_exception();
// TODO check fallback into final suspend
[[noreturn]] static void unhandled_exception() noexcept {
std::terminate();
}

// interface for iterator, used only on top-level generator
// TODO struct next_awaiter { ... }; and maybe .done() lifetimebound!!!

bool done() noexcept {
return get_return_object().done();
}
};

// co_await on empty/ended with exception channel produces nullptr
// co_await on empty channel produces nullptr
template <typename Yield>
struct channel {
using promise_type = channel_promise<Yield>;
using handle_type = std::coroutine_handle<promise_type>;
using value_type = Yield;

private:
handle_type handle;
std::coroutine_handle<> handle = always_done_coroutine();

public:
// postcondition: empty()
Expand All @@ -207,8 +200,8 @@ struct channel {

// postcondition: .empty()
// after this method its caller responsibility to correctly destroy 'handle'
[[nodiscard]] constexpr handle_type release() noexcept {
return std::exchange(handle, nullptr);
[[nodiscard]] constexpr std::coroutine_handle<> release() noexcept {
return std::exchange(handle, always_done_coroutine());
}
// postcondition: .empty()
constexpr void clear() noexcept {
Expand All @@ -222,25 +215,15 @@ struct channel {
// observers

constexpr bool empty() const noexcept {
return !handle || handle.done();
return handle.done();
}
constexpr explicit operator bool() const noexcept {
return !empty();
}

constexpr bool ended_with_exception() const noexcept {
return handle && handle.promise().exception;
}

// TODO consume_all(foo) -> task<pair<decltype(foo), excepion_ptr>>, + move внутрь функции конечно же, если
// не const
// TODO перенести в promise + for(auto c = channel.consumer(); auto* v = co_await c;)
// analogue for 'begin' of ranges
// usage:
// while(auto* x = co_await channel.next())
// while(auto* x = co_await channel.next()) TODO doc
[[nodiscard]] auto next() & noexcept KELCORO_LIFETIMEBOUND {
if (!handle) [[unlikely]]
*this = []() -> channel<Yield> { co_return; }();
assume_not_null(handle);
return typename promise_type::consumer_t(handle);
}
Expand Down
34 changes: 31 additions & 3 deletions include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,18 @@ constexpr decltype(auto) build_awaiter(T&& value) {
return std::forward<T>(value).operator co_await();
}

struct always_done_coroutine_promise : enable_memory_resource_support {
struct always_done_coroutine_promise {
static void* operator new(std::size_t frame_size) {
assert(frame_size < 128 && "hack dont works(");
// there are only one always done coroutine.
// Its always without state, only compiler-inner things
alignas(__STDCPP_DEFAULT_NEW_ALIGNMENT__) static char hack[128];
return hack;
}

static void operator delete(void* ptr, std::size_t frame_size) noexcept {
// noop
}
static constexpr std::suspend_never initial_suspend() noexcept {
return {};
}
Expand Down Expand Up @@ -337,8 +348,10 @@ namespace dd {
// хм, recieve должен на вход генератор чтоли получать... И иметь оператор co_await для канала
// recieve(gen) co_await recieve(channel)

// caller must only use .done() and cast to coroutine_handle<void>
// resume on returned value produces undefined behavior
// returns handle for which
// .done() == true
// .destroy() is noop
// .resume() produces undefined behavior
inline always_done_coroutine_handle always_done_coroutine() noexcept {
return noexport::always_done_coro;
}
Expand All @@ -365,4 +378,19 @@ KELCORO_ALWAYS_INLINE void assume_not_null(std::coroutine_handle<> h) noexcept {
assume(h.address() != nullptr);
}

// tag for yielding from generator/channel by reference.
// This means, if 'value' will be changed by caller it will be
// observable from coroutine
// example:
// int i = 0;
// co_yield ref{i};
// ..here i may be != 0, if caller changed it
template <typename Yield>
struct by_ref {
Yield& value;
};

template <typename Yield>
by_ref(Yield&) -> by_ref<Yield>;

} // namespace dd
38 changes: 13 additions & 25 deletions include/generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#endif

namespace dd {

// TODO generator must be consumer
template <typename>
struct generator;

Expand All @@ -28,7 +28,8 @@ struct generator_promise : enable_memory_resource_support {
// invariant: never nullptr, initialized in first co_yield
Yield* current_result = nullptr;
std::coroutine_handle<> current_worker = get_return_object();
std::coroutine_handle<> owner = std::noop_coroutine();
// nullptr means top-level
handle_type owner = nullptr;

generator_promise() = default;

Expand Down Expand Up @@ -66,26 +67,9 @@ struct generator_promise : enable_memory_resource_support {
return !leaf || leaf.done();
}

private:
void set_root_for_each_leaf_subleaf(generator_promise& root) const noexcept {
// attached leaf already has some leaves and it is 'root' for them.
// may be reached ONLY if co_yield elements_of
// is before producing first value

// invariant: 'leaf' is reachable from 'lowest', they are in same 'stack'
handle_type lowest = handle_type::from_address(leaf.promise().current_worker.address());
while (leaf != lowest) {
lowest.promise().root = &root;
lowest = handle_type::from_address(lowest.promise().owner.address());
}
}

public:
std::coroutine_handle<> await_suspend(handle_type owner) const noexcept {
generator_promise& leaf_p = leaf.promise();
generator_promise& root = *owner.promise().root;
if (leaf_p.current_worker != leaf) [[unlikely]]
set_root_for_each_leaf_subleaf(root);
leaf_p.root = &root;
leaf_p.owner = owner;
root.current_worker = leaf_p.current_worker;
Expand All @@ -104,10 +88,12 @@ struct generator_promise : enable_memory_resource_support {
root->current_result = std::addressof(rvalue);
return {};
}
std::suspend_always yield_value(by_ref<Yield> r) noexcept {
root->current_result = std::addressof(r.value);
return {};
}
hold_value_until_resume yield_value(const Yield& clvalue) noexcept(
std::is_nothrow_copy_constructible_v<Yield>) {
// this overload accepts Yield& too, because if i accept lvalue by ref and on
// call site move out value it will be visible(move_iterator will possbly break code)
return hold_value_until_resume{Yield(clvalue)};
}
// attaches leaf-generator
Expand Down Expand Up @@ -137,12 +123,15 @@ struct generator_promise : enable_memory_resource_support {
transfer_control_to final_suspend() const noexcept {
root->current_worker = owner;
root->current_result = nullptr;
return transfer_control_to{owner}; // noop coro if done
if (owner) {
owner.promise().root = root;
return transfer_control_to{owner};
}
return transfer_control_to{std::noop_coroutine()};
}
static constexpr void return_void() noexcept {
}
[[noreturn]] static void unhandled_exception() {
// TODO ? может ли быть неконсистентное состояние?..
throw;
}

Expand Down Expand Up @@ -193,7 +182,7 @@ struct generator_iterator {
return static_cast<reference>(*handle.promise().current_result);
}
// * after invoking references to value from operator* are invalidated
generator_iterator& operator++() {
generator_iterator& operator++() KELCORO_LIFETIMEBOUND {
assert(!handle.done());
handle.promise().produce_next();
return *this;
Expand Down Expand Up @@ -222,7 +211,6 @@ struct generator {
// postcondition: empty(), 'for' loop produces 0 values
constexpr generator() noexcept = default;
// precondition: 'handle' != nullptr && !handle.done()
// TODO must be unusable for invariant, that generator created from non-started non null handle
constexpr generator(handle_type handle) noexcept : handle(handle) {
assume_not_null(handle);
assume(!handle.done());
Expand Down
Loading

0 comments on commit a792cc3

Please sign in to comment.