Skip to content

Commit

Permalink
support lazy callback (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Mar 25, 2024
1 parent b8e130c commit 83c5dbf
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 4 deletions.
63 changes: 61 additions & 2 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@

namespace coro_io {

template <typename T>
constexpr inline bool is_lazy_v =
util::is_specialization_v<std::remove_cvref_t<T>, async_simple::coro::Lazy>;

template <typename Arg, typename Derived>
class callback_awaitor_base {
private:
Expand Down Expand Up @@ -395,9 +399,64 @@ async_simple::coro::Lazy<std::pair<
});
}

template <typename T>
inline decltype(auto) select_impl(T &pair) {
using Func = std::tuple_element_t<1, std::remove_cvref_t<T>>;
using ValueType =
typename std::tuple_element_t<0, std::remove_cvref_t<T>>::ValueType;
using return_type = std::invoke_result_t<Func, async_simple::Try<ValueType>>;

auto &callback = std::get<1>(pair);
if constexpr (coro_io::is_lazy_v<return_type>) {
auto executor = std::get<0>(pair).getExecutor();
return std::make_pair(
std::move(std::get<0>(pair)),
[executor, callback = std::move(callback)](auto &&val) {
if (executor) {
callback(std::move(val)).via(executor).start([](auto &&) {
});
}
else {
callback(std::move(val)).start([](auto &&) {
});
}
});
}
else {
return pair;
}
}

template <typename... T>
auto select(T &&...args) {
return async_simple::coro::collectAny(std::forward<T>(args)...);
inline auto select(T &&...args) {
return async_simple::coro::collectAny(select_impl(args)...);
}

template <typename T, typename Callback>
inline auto select(std::vector<T> vec, Callback callback) {
if constexpr (coro_io::is_lazy_v<Callback>) {
std::vector<async_simple::Executor *> executors;
for (auto &lazy : vec) {
executors.push_back(lazy.getExecutor());
}

return async_simple::coro::collectAny(
std::move(vec),
[executors, callback = std::move(callback)](size_t index, auto &&val) {
auto executor = executors[index];
if (executor) {
callback(index, std::move(val)).via(executor).start([](auto &&) {
});
}
else {
callback(index, std::move(val)).start([](auto &&) {
});
}
});
}
else {
return async_simple::coro::collectAny(std::move(vec), std::move(callback));
}
}

template <typename Socket, typename AsioBuffer>
Expand Down
4 changes: 2 additions & 2 deletions include/ylt/thirdparty/async_simple/coro/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ struct CollectAnyAwaiter {
auto count = e->downCount();
if (count == size + 1) {
r->_idx = i;
(*callback)(i, std::move(result));
(void)(*callback)(i, std::move(result));
c.resume();
}
});
Expand Down Expand Up @@ -222,7 +222,7 @@ struct CollectAnyVariadicPairAwaiter {
callback](auto&& res) mutable {
auto count = event->downCount();
if (count == std::tuple_size<InputType>() + 1) {
callback(std::move(res));
(void)callback(std::move(res));
*result = I;
continuation.resume();
}
Expand Down
36 changes: 36 additions & 0 deletions src/coro_io/tests/test_coro_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,43 @@ async_simple::coro::Lazy<void> test_select_channel() {
}
}

void callback_lazy() {
using namespace async_simple::coro;
auto test0 = []() mutable -> Lazy<int> {
co_return 41;
};

auto test1 = []() mutable -> Lazy<int> {
co_return 42;
};

auto collectAnyLazy = [](auto&&... args) mutable -> Lazy<size_t> {
co_return co_await collectAny(std::move(args)...);
};

syncAwait(
collectAnyLazy(std::pair{test1(), [&](auto&& val) mutable -> Lazy<void> {
CHECK(val.value() == 42);
int r = co_await test0();
int result = r + val.value();
CHECK(result == 83);
}}));

std::vector<Lazy<int>> input;
input.push_back(test1());

auto index = syncAwait(collectAnyLazy(
std::move(input), [&test0](size_t index, auto val) mutable -> Lazy<void> {
CHECK(val.value() == 42);
int r = co_await test0();
int result = r + val.value();
CHECK(result == 83);
}));
CHECK(index == 0);
}

TEST_CASE("test channel send recieve, test select channel and coroutine") {
async_simple::coro::syncAwait(test_coro_channel());
async_simple::coro::syncAwait(test_select_channel());
callback_lazy();
}

0 comments on commit 83c5dbf

Please sign in to comment.