From ac54ad04b1ee036ce7884ce41be4fc3c440a1476 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Sat, 16 Mar 2024 09:12:11 +0800 Subject: [PATCH 1/3] [ci][fix]try to fix ci (#626) --- cmake/develop.cmake | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmake/develop.cmake b/cmake/develop.cmake index 061f1d7ee..81cf34e37 100644 --- a/cmake/develop.cmake +++ b/cmake/develop.cmake @@ -35,7 +35,12 @@ option(CORO_RPC_USE_OTHER_RPC "coro_rpc extend to support other rpc" OFF) message(STATUS "CORO_RPC_USE_OTHER_RPC: ${CORO_RPC_USE_OTHER_RPC}") # Enable address sanitizer -option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON) +if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" OFF) +else() + option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON) +endif() + if(ENABLE_SANITIZER AND NOT MSVC) if(CMAKE_BUILD_TYPE STREQUAL "Debug") check_asan(HAS_ASAN) From 076fc86a16b919799752e674e44b5c53a46e1f7f Mon Sep 17 00:00:00 2001 From: qicosmos Date: Sat, 16 Mar 2024 10:05:13 +0800 Subject: [PATCH 2/3] update async_simple 6be48e7b3edde61a8a4e7ca432d25a8d9840153c (#628) --- .../thirdparty/async_simple/CMakeLists.txt | 73 -- include/ylt/thirdparty/async_simple/Collect.h | 75 +- include/ylt/thirdparty/async_simple/Common.h | 22 +- .../ylt/thirdparty/async_simple/Executor.h | 209 ++-- include/ylt/thirdparty/async_simple/Future.h | 529 +++++------ .../ylt/thirdparty/async_simple/FutureState.h | 500 +++++----- .../ylt/thirdparty/async_simple/IOExecutor.h | 58 +- .../ylt/thirdparty/async_simple/LocalState.h | 93 +- .../ylt/thirdparty/async_simple/MoveWrapper.h | 2 +- include/ylt/thirdparty/async_simple/Promise.h | 146 +-- include/ylt/thirdparty/async_simple/Traits.h | 29 +- include/ylt/thirdparty/async_simple/Try.h | 2 +- include/ylt/thirdparty/async_simple/Unit.h | 7 +- .../thirdparty/async_simple/coro/Collect.h | 898 +++++++++++------- .../async_simple/coro/ConditionVariable.h | 255 ++--- .../thirdparty/async_simple/coro/CountEvent.h | 60 +- .../async_simple/coro/DetachedCoroutine.h | 55 +- .../thirdparty/async_simple/coro/Dispatch.h | 117 +++ .../async_simple/coro/FutureAwaiter.h | 43 +- .../thirdparty/async_simple/coro/Generator.h | 5 + .../ylt/thirdparty/async_simple/coro/Latch.h | 73 +- .../ylt/thirdparty/async_simple/coro/Lazy.h | 686 +++++++------ .../async_simple/coro/ResumeBySchedule.h | 82 ++ .../thirdparty/async_simple/coro/Semaphore.h | 116 ++- .../async_simple/coro/SharedMutex.h | 17 +- .../ylt/thirdparty/async_simple/coro/Sleep.h | 21 +- .../thirdparty/async_simple/coro/SpinLock.h | 83 +- .../thirdparty/async_simple/coro/SyncAwait.h | 37 +- .../ylt/thirdparty/async_simple/coro/Traits.h | 21 +- .../async_simple/coro/ViaCoroutine.h | 236 +++-- .../async_simple/executors/SimpleExecutor.cpp | 49 - .../async_simple/executors/SimpleExecutor.h | 67 +- .../async_simple/executors/SimpleIOExecutor.h | 213 +++-- .../thirdparty/async_simple/uthread/Async.h | 101 +- .../thirdparty/async_simple/uthread/Await.h | 2 +- .../thirdparty/async_simple/uthread/Collect.h | 86 +- .../thirdparty/async_simple/uthread/Latch.h | 53 +- .../thirdparty/async_simple/uthread/Uthread.h | 96 +- .../async_simple/uthread/internal/thread.cc | 24 +- .../uthread/internal/thread_impl.h | 1 - .../thirdparty/async_simple/util/Condition.h | 38 +- .../ylt/thirdparty/async_simple/util/Queue.h | 137 ++- .../thirdparty/async_simple/util/ThreadPool.h | 279 +++--- .../async_simple/util/move_only_function.h | 410 ++++++++ 44 files changed, 3445 insertions(+), 2661 deletions(-) delete mode 100644 include/ylt/thirdparty/async_simple/CMakeLists.txt create mode 100644 include/ylt/thirdparty/async_simple/coro/Dispatch.h create mode 100644 include/ylt/thirdparty/async_simple/coro/ResumeBySchedule.h delete mode 100644 include/ylt/thirdparty/async_simple/executors/SimpleExecutor.cpp create mode 100644 include/ylt/thirdparty/async_simple/util/move_only_function.h diff --git a/include/ylt/thirdparty/async_simple/CMakeLists.txt b/include/ylt/thirdparty/async_simple/CMakeLists.txt deleted file mode 100644 index 59b774234..000000000 --- a/include/ylt/thirdparty/async_simple/CMakeLists.txt +++ /dev/null @@ -1,73 +0,0 @@ -file(GLOB coro_src "coro/*.cpp") -file(GLOB executors_src "executors/*.cpp") - -if(${UTHREAD}) - file(GLOB uthread_src "uthread/internal/*.cc") - if (CMAKE_BUILD_TYPE STREQUAL "Debug" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND - CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL "13") - endif() -file(GLOB uthread_asm_src "uthread/internal/${CMAKE_SYSTEM_NAME}/${CMAKE_SYSTEM_PROCESSOR}/*.S") -endif() - -file(GLOB headers "*.h") -file(GLOB coro_header "coro/*.h") -file(GLOB executors_header "executors/*.h") -file(GLOB experimental_header "experimental/*.h") -file(GLOB util_header "util/*.h") -if(UTHREAD) - file(GLOB uthread_header "uthread/*.h") - file(GLOB uthread_internal_header "uthread/internal/*.h") -endif() - - -set(SRCS - ${coro_src} - ${executors_src} - ) -if(UTHREAD) - list(APPEND SRCS ${uthread_src}) - list(APPEND SRCS ${uthread_asm_src}) -endif() - -if(NOT CMAKE_CXX_COMPILER_ID MATCHES "MSVC") - add_library(async_simple_static STATIC ${SRCS}) - add_library(async_simple SHARED ${SRCS}) - target_link_libraries(async_simple PUBLIC libasync_simple) - target_link_libraries(async_simple_static PUBLIC libasync_simple) - - set_target_properties(async_simple_static PROPERTIES OUTPUT_NAME "async_simple") - - install(TARGETS async_simple DESTINATION lib/) - install(TARGETS async_simple_static DESTINATION lib/) -else() - add_library(async_simple STATIC ${SRCS}) - target_link_libraries(async_simple PUBLIC libasync_simple) - install(TARGETS async_simple DESTINATION lib/) -endif() - -set_target_properties(async_simple PROPERTIES - VERSION ${PROJECT_VERSION} - SOVERSION ${PROJECT_VERSION_MAJOR}) - -install(FILES ${headers} DESTINATION include/async_simple) -install(FILES ${coro_header} DESTINATION include/async_simple/coro) -install(FILES ${executors_header} DESTINATION include/async_simple/executors) -install(FILES ${experimental_header} DESTINATION include/async_simple/experimental) -install(FILES ${util_header} DESTINATION include/async_simple/util) -if(UTHREAD) - install(FILES ${uthread_header} DESTINATION include/async_simple/uthread) - install(FILES ${uthread_internal_header} DESTINATION include/async_simple/uthread/internal) -endif() - -if (${ASYNC_SIMPLE_ENABLE_TESTS}) - add_subdirectory(test) - add_subdirectory(util/test) - add_subdirectory(coro/test) - add_subdirectory(executors/test) - if(UTHREAD) - add_subdirectory(uthread/test) - endif() -endif() -if (NOT TARGET async_simple::async_simple_header_only) - add_library(async_simple::async_simple_header_only ALIAS libasync_simple) -endif () diff --git a/include/ylt/thirdparty/async_simple/Collect.h b/include/ylt/thirdparty/async_simple/Collect.h index d14c332b2..937b86333 100644 --- a/include/ylt/thirdparty/async_simple/Collect.h +++ b/include/ylt/thirdparty/async_simple/Collect.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,14 +17,14 @@ #define ASYNC_SIMPLE_COLLECT_H #include -#include #include #include - #include "async_simple/Common.h" #include "async_simple/Future.h" #include "async_simple/Try.h" +#include + namespace async_simple { // collectAll - collect all the values for a range of futures. @@ -50,48 +50,47 @@ template inline Future::value_type::value_type>>> collectAll(Iterator begin, Iterator end) { - using T = typename std::iterator_traits::value_type::value_type; - size_t n = std::distance(begin, end); + using T = typename std::iterator_traits::value_type::value_type; + size_t n = std::distance(begin, end); - bool allReady = true; - for (auto iter = begin; iter != end; ++iter) { - if (!iter->hasResult()) { - allReady = false; - break; - } - } - if (allReady) { - std::vector> results; - results.reserve(n); + bool allReady = true; for (auto iter = begin; iter != end; ++iter) { - results.push_back(std::move(iter->result())); + if (!iter->hasResult()) { + allReady = false; + break; + } + } + if (allReady) { + std::vector> results; + results.reserve(n); + for (auto iter = begin; iter != end; ++iter) { + results.push_back(std::move(iter->result())); + } + return Future>>(std::move(results)); } - return Future>>(std::move(results)); - } - Promise>> promise; - auto future = promise.getFuture(); + Promise>> promise; + auto future = promise.getFuture(); - struct Context { - Context(size_t n, Promise>> p_) - : results(n), p(std::move(p_)) {} - ~Context() { p.setValue(std::move(results)); } - std::vector> results; - Promise>> p; - }; + struct Context { + Context(size_t n, Promise>> p_) + : results(n), p(std::move(p_)) {} + ~Context() { p.setValue(std::move(results)); } + std::vector> results; + Promise>> p; + }; - auto ctx = std::make_shared(n, std::move(promise)); - for (size_t i = 0; i < n; ++i, ++begin) { - if (begin->hasResult()) { - ctx->results[i] = std::move(begin->result()); - } - else { - begin->setContinuation([ctx, i](Try&& t) mutable { - ctx->results[i] = std::move(t); - }); + auto ctx = std::make_shared(n, std::move(promise)); + for (size_t i = 0; i < n; ++i, ++begin) { + if (begin->hasResult()) { + ctx->results[i] = std::move(begin->result()); + } else { + begin->setContinuation([ctx, i](Try&& t) mutable { + ctx->results[i] = std::move(t); + }); + } } - } - return future; + return future; } } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/Common.h b/include/ylt/thirdparty/async_simple/Common.h index 2bd63bebd..bde017e6d 100644 --- a/include/ylt/thirdparty/async_simple/Common.h +++ b/include/ylt/thirdparty/async_simple/Common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,20 @@ #endif // __SANITIZE_ADDRESS__ #endif // __GNUC__ +#if defined(__alibaba_clang__) && \ + __has_cpp_attribute(ACC::coro_only_destroy_when_complete) +#define CORO_ONLY_DESTROY_WHEN_DONE [[ACC::coro_only_destroy_when_complete]] +#else +#define CORO_ONLY_DESTROY_WHEN_DONE +#endif + +#if defined(__alibaba_clang__) && \ + __has_cpp_attribute(ACC::elideable_after_await) +#define ELIDEABLE_AFTER_AWAIT [[ACC::elideable_after_await]] +#else +#define ELIDEABLE_AFTER_AWAIT +#endif + namespace async_simple { // Different from assert, logicAssert is meaningful in // release mode. logicAssert should be used in case that @@ -52,9 +66,9 @@ namespace async_simple { // a bug in the library. If logicAssert fails, it means // there is a bug in the user code. inline void logicAssert(bool x, const char* errorMsg) { - if (x) - AS_LIKELY { return; } - throw std::logic_error(errorMsg); + if (x) + AS_LIKELY { return; } + throw std::logic_error(errorMsg); } } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/Executor.h b/include/ylt/thirdparty/async_simple/Executor.h index 25f0e998c..d1edfedf8 100644 --- a/include/ylt/thirdparty/async_simple/Executor.h +++ b/include/ylt/thirdparty/async_simple/Executor.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,24 +20,25 @@ #include #include #include - +#include "async_simple/MoveWrapper.h" #include "async_simple/experimental/coroutine.h" +#include "async_simple/util/move_only_function.h" namespace async_simple { // Stat information for an executor. // It contains the number of pending task // for the executor now. struct ExecutorStat { - size_t pendingTaskCount = 0; - ExecutorStat() = default; + size_t pendingTaskCount = 0; + ExecutorStat() = default; }; // Options for a schedule. // The option contains: // - bool prompt. Whether or not this schedule // should be prompted. struct ScheduleOptions { - bool prompt = true; - ScheduleOptions() = default; + bool prompt = true; + ScheduleOptions() = default; }; // Awaitable to get the current executor. @@ -61,115 +62,119 @@ struct CurrentExecutor {}; class IOExecutor; class Executor { - public: - // Context is an identification for the context where an executor - // should run. See checkin/checkout for details. - using Context = void *; - static constexpr Context NULLCTX = nullptr; - - // A time duration in microseconds. - using Duration = std::chrono::duration; - - // The schedulable function. Func should accept no argument and - // return void. - using Func = std::function; - class TimeAwaitable; - class TimeAwaiter; - - Executor(std::string name = "default") : _name(std::move(name)) {} - virtual ~Executor() {} - - Executor(const Executor &) = delete; - Executor &operator=(const Executor &) = delete; - - // Schedule a function. - // `schedule` would return false if schedule failed, which means function - // func will not be executed. In case schedule return true, the executor - // should guarantee that the func would be executed. - virtual bool schedule(Func func) = 0; - // Return true if caller runs in the executor. - virtual bool currentThreadInExecutor() const { - throw std::logic_error("Not implemented"); - } - virtual ExecutorStat stat() const { - throw std::logic_error("Not implemented"); - } - - // checkout() return current "Context", which defined by executor - // implementation, then checkin(func, "Context") should schedule func to the - // same "Context" as before. - virtual size_t currentContextId() const { return 0; }; - virtual Context checkout() { return NULLCTX; } - virtual bool checkin(Func func, [[maybe_unused]] Context ctx, - [[maybe_unused]] ScheduleOptions opts) { - return schedule(std::move(func)); - } - virtual bool checkin(Func func, Context ctx) { - static ScheduleOptions opts; - return checkin(std::move(func), ctx, opts); - } - - const std::string &name() const { return _name; } - - // Use - // co_await executor.after(sometime) - // to schedule current execution after some time. - TimeAwaitable after(Duration dur); - - // IOExecutor accepts IO read/write requests. - // Return nullptr if the executor doesn't offer an IOExecutor. - virtual IOExecutor *getIOExecutor() { - throw std::logic_error("Not implemented"); - } - - protected: - virtual void schedule(Func func, Duration dur) { - std::thread([this, func = std::move(func), dur]() { - std::this_thread::sleep_for(dur); - schedule(std::move(func)); - }).detach(); - } - - private: - std::string _name; +public: + // Context is an identification for the context where an executor + // should run. See checkin/checkout for details. + using Context = void *; + static constexpr Context NULLCTX = nullptr; + + // A time duration in microseconds. + using Duration = std::chrono::duration; + + // The schedulable function. Func should accept no argument and + // return void. + using Func = std::function; + class TimeAwaitable; + class TimeAwaiter; + + Executor(std::string name = "default") : _name(std::move(name)) {} + virtual ~Executor() {} + + Executor(const Executor &) = delete; + Executor &operator=(const Executor &) = delete; + + // Schedule a function. + // `schedule` would return false if schedule failed, which means function + // func will not be executed. In case schedule return true, the executor + // should guarantee that the func would be executed. + virtual bool schedule(Func func) = 0; + + // Schedule a move only functor + bool schedule_move_only(util::move_only_function func) { + MoveWrapper tmp(std::move(func)); + return schedule([func = tmp]() { func.get()(); }); + } + + // Return true if caller runs in the executor. + virtual bool currentThreadInExecutor() const { + throw std::logic_error("Not implemented"); + } + virtual ExecutorStat stat() const { + throw std::logic_error("Not implemented"); + } + + // checkout() return current "Context", which defined by executor + // implementation, then checkin(func, "Context") should schedule func to the + // same "Context" as before. + virtual size_t currentContextId() const { return 0; }; + virtual Context checkout() { return NULLCTX; } + virtual bool checkin(Func func, [[maybe_unused]] Context ctx, + [[maybe_unused]] ScheduleOptions opts) { + return schedule(std::move(func)); + } + virtual bool checkin(Func func, Context ctx) { + static ScheduleOptions opts; + return checkin(std::move(func), ctx, opts); + } + + const std::string &name() const { return _name; } + + // Use + // co_await executor.after(sometime) + // to schedule current execution after some time. + TimeAwaitable after(Duration dur); + + // IOExecutor accepts IO read/write requests. + // Return nullptr if the executor doesn't offer an IOExecutor. + virtual IOExecutor *getIOExecutor() { + throw std::logic_error("Not implemented"); + } + +protected: + virtual void schedule(Func func, Duration dur) { + std::thread([this, func = std::move(func), dur]() { + std::this_thread::sleep_for(dur); + schedule(std::move(func)); + }).detach(); + } + +private: + std::string _name; }; // Awaiter to implement Executor::after. class Executor::TimeAwaiter { - public: - TimeAwaiter(Executor *ex, Executor::Duration dur) : _ex(ex), _dur(dur) {} - - public: - bool await_ready() const noexcept { return false; } - - template - void await_suspend(std::coroutine_handle continuation) { - std::function func = [c = continuation]() mutable { - c.resume(); - }; - _ex->schedule(func, _dur); - } - void await_resume() const noexcept {} - - private: - Executor *_ex; - Executor::Duration _dur; +public: + TimeAwaiter(Executor *ex, Executor::Duration dur) : _ex(ex), _dur(dur) {} + +public: + bool await_ready() const noexcept { return false; } + + template + void await_suspend(std::coroutine_handle continuation) { + _ex->schedule(std::move(continuation), _dur); + } + void await_resume() const noexcept {} + +private: + Executor *_ex; + Executor::Duration _dur; }; // Awaitable to implement Executor::after. class Executor::TimeAwaitable { - public: - TimeAwaitable(Executor *ex, Executor::Duration dur) : _ex(ex), _dur(dur) {} +public: + TimeAwaitable(Executor *ex, Executor::Duration dur) : _ex(ex), _dur(dur) {} - auto coAwait(Executor *) { return Executor::TimeAwaiter(_ex, _dur); } + auto coAwait(Executor *) { return Executor::TimeAwaiter(_ex, _dur); } - private: - Executor *_ex; - Executor::Duration _dur; +private: + Executor *_ex; + Executor::Duration _dur; }; Executor::TimeAwaitable inline Executor::after(Executor::Duration dur) { - return Executor::TimeAwaitable(this, dur); + return Executor::TimeAwaitable(this, dur); }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/Future.h b/include/ylt/thirdparty/async_simple/Future.h index c9afa7cfe..014e9578a 100644 --- a/include/ylt/thirdparty/async_simple/Future.h +++ b/include/ylt/thirdparty/async_simple/Future.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ #define ASYNC_SIMPLE_FUTURE_H #include - #include "async_simple/Executor.h" #include "async_simple/FutureState.h" #include "async_simple/LocalState.h" @@ -49,317 +48,313 @@ class Promise; // he should call makeReadyFuture(). template class Future { - private: - // If T is void, the inner_value_type is Unit. It will be used by - // `FutureState` and `LocalState`. Because `Try` cannot distinguish - // between `Nothing` state and `Value` state. - // It maybe remove Unit after next version, and then will change the - // `Try` to distinguish between `Nothing` state and `Value` state - using inner_value_type = std::conditional_t, Unit, T>; - - public: - using value_type = T; - Future(FutureState* fs) : _sharedState(fs) { - if (_sharedState) { - _sharedState->attachOne(); +private: + // If T is void, the inner_value_type is Unit. It will be used by + // `FutureState` and `LocalState`. Because `Try` cannot distinguish + // between `Nothing` state and `Value` state. + // It maybe remove Unit after next version, and then will change the + // `Try` to distinguish between `Nothing` state and `Value` state + using inner_value_type = std::conditional_t, Unit, T>; + +public: + using value_type = T; + + Future(FutureState* fs) : _sharedState(fs) { + if (_sharedState) { + _sharedState->attachOne(); + } } - } - Future(Try&& t) - : _sharedState(nullptr), _localState(std::move(t)) {} + Future(Try&& t) + : _sharedState(nullptr), _localState(std::move(t)) {} - ~Future() { - if (_sharedState) { - _sharedState->detachOne(); + ~Future() { + if (_sharedState) { + _sharedState->detachOne(); + } } - } - Future(const Future&) = delete; - Future& operator=(const Future&) = delete; + Future(const Future&) = delete; + Future& operator=(const Future&) = delete; - Future(Future&& other) - : _sharedState(other._sharedState), - _localState(std::move(other._localState)) { - other._sharedState = nullptr; - } + Future(Future&& other) + : _sharedState(other._sharedState), + _localState(std::move(other._localState)) { + other._sharedState = nullptr; + } - Future& operator=(Future&& other) { - if (this != &other) { - std::swap(_sharedState, other._sharedState); - _localState = std::move(other._localState); + Future& operator=(Future&& other) { + if (this != &other) { + std::swap(_sharedState, other._sharedState); + _localState = std::move(other._localState); + } + return *this; } - return *this; - } - public: - bool valid() const { - return _sharedState != nullptr || _localState.hasResult(); - } + auto coAwait(Executor*) && noexcept { return std::move(*this); } - bool hasResult() const { - return _localState.hasResult() || _sharedState->hasResult(); - } +public: + bool valid() const { + return _sharedState != nullptr || _localState.hasResult(); + } - std::add_rvalue_reference_t value() && { - if constexpr (std::is_void_v) { - return result().value(); + bool hasResult() const { + return _localState.hasResult() || _sharedState->hasResult(); } - else { - return std::move(result().value()); + + std::add_rvalue_reference_t value() && { + if constexpr (std::is_void_v) { + return result().value(); + } else { + return std::move(result().value()); + } } - } - std::add_lvalue_reference_t value() & { return result().value(); } - const std::add_lvalue_reference_t value() const& { - return result().value(); - } - - Try&& result() && requires(!std::is_void_v) { - return std::move(getTry(*this)); - } - Try& result() & requires(!std::is_void_v) { return getTry(*this); } - const Try& result() const& requires(!std::is_void_v) { - return getTry(*this); - } - - Try result() && requires(std::is_void_v) { return getTry(*this); } - Try result() & requires(std::is_void_v) { return getTry(*this); } - Try result() const& requires(std::is_void_v) { - return getTry(*this); - } - - // get is only allowed on rvalue, aka, Future is not valid after get - // invoked. - // - // Get value blocked thread when the future doesn't have a value. - // If future in uthread context, use await(future) to get value without - // thread blocked. - T get() && { - wait(); - return (std::move(*this)).value(); - } - // Implementation for get() to wait synchronously. - void wait() { - logicAssert(valid(), "Future is broken"); - - if (hasResult()) { - return; + std::add_lvalue_reference_t value() & { return result().value(); } + const std::add_lvalue_reference_t value() const& { + return result().value(); } - // wait in the same executor may cause deadlock - assert(!currentThreadInExecutor()); - - // The state is a shared state - Promise promise; - auto future = promise.getFuture(); - - _sharedState->setExecutor( - nullptr); // following continuation is simple, execute inplace - std::mutex mtx; - std::condition_variable cv; - std::atomic done{false}; - _sharedState->setContinuation( - [&mtx, &cv, &done, p = std::move(promise)](Try&& t) mutable { - std::unique_lock lock(mtx); - p.setValue(std::move(t)); - done.store(true, std::memory_order_relaxed); - cv.notify_one(); - }); - std::unique_lock lock(mtx); - cv.wait(lock, [&done]() { - return done.load(std::memory_order_relaxed); - }); - *this = std::move(future); - assert(_sharedState->hasResult()); - } - - // Set the executor for the future. This only works for rvalue. - // So the original future shouldn't be accessed after setting - // an executor. The user should use the returned future instead. - Future via(Executor* executor) && { - setExecutor(executor); - Future ret(std::move(*this)); - return ret; - } - - // thenTry() is only allowed on rvalues, do not access a future after - // thenTry() called. F is a callback function which takes Try&& as - // parameter. - // - template > - Future thenTry(F&& f) && { - return thenImpl(std::forward(f)); - } - - // Similar to thenTry, but F takes a T&&. If exception throws, F will not be - // called. - template > - Future thenValue(F&& f) && { - auto lambda = [func = std::forward(f)](Try&& t) mutable { - if constexpr (std::is_void_v) { - t.value(); - return std::forward(func)(); - } - else { - return std::forward(func)(std::move(t).value()); - }; - }; - using Func = decltype(lambda); - return thenImpl>(std::move(lambda)); - } - - template , - ValueCallableResult, TryCallableResult>> - Future then(F&& f) && { - if constexpr (std::is_invocable_v) { - return std::move(*this).thenValue(std::forward(f)); + Try&& result() && requires(!std::is_void_v) { + return std::move(getTry(*this)); + } + Try& result() & requires(!std::is_void_v) { return getTry(*this); } + const Try& result() const& requires(!std::is_void_v) { + return getTry(*this); } - else { - return std::move(*this).thenTry(std::forward(f)); + + Try result() && requires(std::is_void_v) { return getTry(*this); } + Try result() & requires(std::is_void_v) { return getTry(*this); } + Try result() const& requires(std::is_void_v) { + return getTry(*this); } - } - - public: - // This section is public because they may invoked by other type of Future. - // They are not suppose to be public. - // FIXME: mark the section as private. - void setExecutor(Executor* ex) { - if (_sharedState) { - _sharedState->setExecutor(ex); + + // get is only allowed on rvalue, aka, Future is not valid after get + // invoked. + // + // Get value blocked thread when the future doesn't have a value. + // If future in uthread context, use await(future) to get value without + // thread blocked. + T get() && { + wait(); + return (std::move(*this)).value(); } - else { - _localState.setExecutor(ex); + // Implementation for get() to wait synchronously. + void wait() { + logicAssert(valid(), "Future is broken"); + + if (hasResult()) { + return; + } + + // wait in the same executor may cause deadlock + assert(!currentThreadInExecutor()); + + // The state is a shared state + Promise promise; + auto future = promise.getFuture(); + + _sharedState->setExecutor( + nullptr); // following continuation is simple, execute inplace + std::mutex mtx; + std::condition_variable cv; + std::atomic done{false}; + _sharedState->setContinuation( + [&mtx, &cv, &done, p = std::move(promise)](Try&& t) mutable { + std::unique_lock lock(mtx); + p.setValue(std::move(t)); + done.store(true, std::memory_order_relaxed); + cv.notify_one(); + }); + std::unique_lock lock(mtx); + cv.wait(lock, + [&done]() { return done.load(std::memory_order_relaxed); }); + *this = std::move(future); + assert(_sharedState->hasResult()); } - } - Executor* getExecutor() { - if (_sharedState) { - return _sharedState->getExecutor(); + // Set the executor for the future. This only works for rvalue. + // So the original future shouldn't be accessed after setting + // an executor. The user should use the returned future instead. + Future via(Executor* executor) && { + setExecutor(executor); + Future ret(std::move(*this)); + return ret; } - else { - return _localState.getExecutor(); + + // thenTry() is only allowed on rvalues, do not access a future after + // thenTry() called. F is a callback function which takes Try&& as + // parameter. + // + template > + Future thenTry(F&& f) && { + return thenImpl(std::forward(f)); } - } - template - void setContinuation(F&& func) { - assert(valid()); - if (_sharedState) { - _sharedState->setContinuation(std::forward(func)); + // Similar to thenTry, but F takes a T&&. If exception throws, F will not be + // called. + template > + Future thenValue(F&& f) && { + auto lambda = [func = std::forward(f)](Try&& t) mutable { + if constexpr (std::is_void_v) { + t.value(); + return std::forward(func)(); + } else { + return std::forward(func)(std::move(t).value()); + }; + }; + using Func = decltype(lambda); + return thenImpl>(std::move(lambda)); } - else { - _localState.setContinuation(std::forward(func)); + + template , + ValueCallableResult, + TryCallableResult>> + Future then(F&& f) && { + if constexpr (std::is_invocable_v) { + return std::move(*this).thenValue(std::forward(f)); + } else { + return std::move(*this).thenTry(std::forward(f)); + } } - } - bool currentThreadInExecutor() const { - assert(valid()); - if (_sharedState) { - return _sharedState->currentThreadInExecutor(); +public: + // This section is public because they may invoked by other type of Future. + // They are not suppose to be public. + // FIXME: mark the section as private. + void setExecutor(Executor* ex) { + if (_sharedState) { + _sharedState->setExecutor(ex); + } else { + _localState.setExecutor(ex); + } } - else { - return _localState.currentThreadInExecutor(); + + Executor* getExecutor() { + if (_sharedState) { + return _sharedState->getExecutor(); + } else { + return _localState.getExecutor(); + } } - } - - bool TEST_hasLocalState() const { return _localState.hasResult(); } - - private: - template - static decltype(auto) getTry(Clazz& self) { - logicAssert(self.valid(), "Future is broken"); - logicAssert(self._localState.hasResult() || self._sharedState->hasResult(), - "Future is not ready"); - if (self._sharedState) { - return self._sharedState->getTry(); + + template + void setContinuation(F&& func) { + assert(valid()); + if (_sharedState) { + _sharedState->setContinuation(std::forward(func)); + } else { + _localState.setContinuation(std::forward(func)); + } } - else { - return self._localState.getTry(); + + bool currentThreadInExecutor() const { + assert(valid()); + if (_sharedState) { + return _sharedState->currentThreadInExecutor(); + } else { + return _localState.currentThreadInExecutor(); + } } - } - - // continuation returns a future - template - Future thenImpl(F&& func) { - logicAssert(valid(), "Future is broken"); - using T2 = typename R::ReturnsFuture::Inner; - - if (!_sharedState) { - if constexpr (R::ReturnsFuture::value) { - try { - auto newFuture = - std::forward(func)(std::move(_localState.getTry())); - if (!newFuture.getExecutor()) { - newFuture.setExecutor(_localState.getExecutor()); - } - return newFuture; - } catch (...) { - return Future(Try(std::current_exception())); + + bool TEST_hasLocalState() const { return _localState.hasResult(); } + +private: + template + static decltype(auto) getTry(Clazz& self) { + logicAssert(self.valid(), "Future is broken"); + logicAssert( + self._localState.hasResult() || self._sharedState->hasResult(), + "Future is not ready"); + if (self._sharedState) { + return self._sharedState->getTry(); + } else { + return self._localState.getTry(); } - } - else { - Future newFuture(makeTryCall(std::forward(func), - std::move(_localState.getTry()))); - newFuture.setExecutor(_localState.getExecutor()); - return newFuture; - } } - Promise promise; - auto newFuture = promise.getFuture(); - newFuture.setExecutor(_sharedState->getExecutor()); - _sharedState->setContinuation( - [p = std::move(promise), - f = std::forward(func)](Try&& t) mutable { - if (!R::isTry && t.hasError()) { - p.setException(t.getException()); - } - else { + // continuation returns a future + template + Future thenImpl(F&& func) { + logicAssert(valid(), "Future is broken"); + using T2 = typename R::ReturnsFuture::Inner; + + if (!_sharedState) { if constexpr (R::ReturnsFuture::value) { - try { - auto f2 = f(std::move(t)); - f2.setContinuation([pm = std::move(p)](Try&& t2) mutable { - pm.setValue(std::move(t2)); - }); - } catch (...) { - p.setException(std::current_exception()); - } + try { + auto newFuture = + std::forward(func)(std::move(_localState.getTry())); + if (!newFuture.getExecutor()) { + newFuture.setExecutor(_localState.getExecutor()); + } + return newFuture; + } catch (...) { + return Future(Try(std::current_exception())); + } + } else { + Future newFuture(makeTryCall( + std::forward(func), std::move(_localState.getTry()))); + newFuture.setExecutor(_localState.getExecutor()); + return newFuture; } - else { - p.setValue(makeTryCall(std::forward(f), - std::move(t))); // Try - } - } - }); - return newFuture; - } - - private: - FutureState* _sharedState; - - // Ready-Future does not have a Promise, an inline state is faster. - LocalState _localState; - - private: - template - friend Future::value_type::value_type>>> - collectAll(Iter begin, Iter end); + } + + Promise promise; + auto newFuture = promise.getFuture(); + newFuture.setExecutor(_sharedState->getExecutor()); + _sharedState->setContinuation( + [p = std::move(promise), + f = std::forward(func)](Try&& t) mutable { + if (!R::isTry && t.hasError()) { + p.setException(t.getException()); + } else { + if constexpr (R::ReturnsFuture::value) { + try { + auto f2 = f(std::move(t)); + f2.setContinuation( + [pm = std::move(p)](Try&& t2) mutable { + pm.setValue(std::move(t2)); + }); + } catch (...) { + p.setException(std::current_exception()); + } + } else { + p.setValue(makeTryCall(std::forward(f), + std::move(t))); // Try + } + } + }); + return newFuture; + } + +private: + FutureState* _sharedState; + + // Ready-Future does not have a Promise, an inline state is faster. + LocalState _localState; + +private: + template + friend Future::value_type::value_type>>> + collectAll(Iter begin, Iter end); }; // Make a ready Future template Future makeReadyFuture(T&& v) { - return Future(Try(std::forward(v))); + return Future(Try(std::forward(v))); } template Future makeReadyFuture(Try&& t) { - return Future(std::move(t)); + return Future(std::move(t)); } template Future makeReadyFuture(std::exception_ptr ex) { - return Future(Try(ex)); + return Future(Try(ex)); +} +inline Future makeReadyFuture() { + return Future(Try(Unit())); } -inline Future makeReadyFuture() { return Future(Try()); } } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/FutureState.h b/include/ylt/thirdparty/async_simple/FutureState.h index 9df81f44e..7f4958399 100644 --- a/include/ylt/thirdparty/async_simple/FutureState.h +++ b/include/ylt/thirdparty/async_simple/FutureState.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,18 @@ #define ASYNC_SIMPLE_FUTURESTATE_H #include +#include + #include #include #include -#include #include #include #include - #include "async_simple/Common.h" #include "async_simple/Executor.h" -#include "async_simple/MoveWrapper.h" #include "async_simple/Try.h" +#include "async_simple/util/move_only_function.h" namespace async_simple { @@ -37,18 +37,18 @@ namespace async_simple { namespace detail { enum class State : uint8_t { - START = 0, - ONLY_RESULT = 1 << 0, - ONLY_CONTINUATION = 1 << 1, - DONE = 1 << 5, + START = 0, + ONLY_RESULT = 1 << 0, + ONLY_CONTINUATION = 1 << 1, + DONE = 1 << 5, }; constexpr State operator|(State lhs, State rhs) { - return State((uint8_t)lhs | (uint8_t)rhs); + return State((uint8_t)lhs | (uint8_t)rhs); } constexpr State operator&(State lhs, State rhs) { - return State((uint8_t)lhs & (uint8_t)rhs); + return State((uint8_t)lhs & (uint8_t)rhs); } } // namespace detail @@ -61,270 +61,276 @@ constexpr State operator&(State lhs, State rhs) { // Users should **never** use FutureState directly. template class FutureState { - private: - using Continuation = std::function&& value)>; - - private: - // A helper to help FutureState to count the references to guarantee - // that the memory get released correctly. - class ContinuationReference { - public: - ContinuationReference() = default; - explicit ContinuationReference(FutureState* fs) : _fs(fs) { attach(); } - ~ContinuationReference() { detach(); } - - ContinuationReference(const ContinuationReference& other) : _fs(other._fs) { - attach(); - } - ContinuationReference& operator=(const ContinuationReference& other) = - delete; +private: + using Continuation = util::move_only_function&& value)>; + +private: + // A helper to help FutureState to count the references to guarantee + // that the memory get released correctly. + class ContinuationReference { + public: + ContinuationReference() = default; + explicit ContinuationReference(FutureState* fs) : _fs(fs) { + attach(); + } + ~ContinuationReference() { detach(); } - ContinuationReference(ContinuationReference&& other) : _fs(other._fs) { - other._fs = nullptr; - } + ContinuationReference(const ContinuationReference& other) + : _fs(other._fs) { + attach(); + } + ContinuationReference& operator=(const ContinuationReference& other) = + delete; - ContinuationReference& operator=(ContinuationReference&& other) = delete; + ContinuationReference(ContinuationReference&& other) : _fs(other._fs) { + other._fs = nullptr; + } + + ContinuationReference& operator=(ContinuationReference&& other) = + delete; - FutureState* getFutureState() const noexcept { return _fs; } + FutureState* getFutureState() const noexcept { return _fs; } + + private: + void attach() { + if (_fs) { + _fs->attachOne(); + _fs->refContinuation(); + } + } + void detach() { + if (_fs) { + _fs->derefContinuation(); + _fs->detachOne(); + } + } - private: - void attach() { - if (_fs) { - _fs->attachOne(); - _fs->refContinuation(); - } + private: + FutureState* _fs = nullptr; + }; + +public: + FutureState() + : _state(detail::State::START), + _attached(0), + _continuationRef(0), + _executor(nullptr), + _context(Executor::NULLCTX), + _promiseRef(0), + _forceSched(false) {} + ~FutureState() {} + + FutureState(const FutureState&) = delete; + FutureState& operator=(const FutureState&) = delete; + + FutureState(FutureState&& other) = delete; + FutureState& operator=(FutureState&&) = delete; + +public: + bool hasResult() const noexcept { + constexpr auto allow = detail::State::DONE | detail::State::ONLY_RESULT; + auto state = _state.load(std::memory_order_acquire); + return (state & allow) != detail::State(); } - void detach() { - if (_fs) { - _fs->derefContinuation(); - _fs->detachOne(); - } + + bool hasContinuation() const noexcept { + constexpr auto allow = + detail::State::DONE | detail::State::ONLY_CONTINUATION; + auto state = _state.load(std::memory_order_acquire); + return (state & allow) != detail::State(); } - private: - FutureState* _fs = nullptr; - }; - - public: - FutureState() - : _state(detail::State::START), - _attached(0), - _continuationRef(0), - _executor(nullptr), - _context(Executor::NULLCTX), - _promiseRef(0), - _forceSched(false) {} - ~FutureState() {} - - FutureState(const FutureState&) = delete; - FutureState& operator=(const FutureState&) = delete; - - FutureState(FutureState&& other) = delete; - FutureState& operator=(FutureState&&) = delete; - - public: - bool hasResult() const noexcept { - constexpr auto allow = detail::State::DONE | detail::State::ONLY_RESULT; - auto state = _state.load(std::memory_order_acquire); - return (state & allow) != detail::State(); - } - - bool hasContinuation() const noexcept { - constexpr auto allow = - detail::State::DONE | detail::State::ONLY_CONTINUATION; - auto state = _state.load(std::memory_order_acquire); - return (state & allow) != detail::State(); - } - - AS_INLINE void attachOne() { - _attached.fetch_add(1, std::memory_order_relaxed); - } - AS_INLINE void detachOne() { - auto old = _attached.fetch_sub(1, std::memory_order_acq_rel); - assert(old >= 1u); - if (old == 1) { - delete this; + AS_INLINE void attachOne() { + _attached.fetch_add(1, std::memory_order_relaxed); + } + AS_INLINE void detachOne() { + auto old = _attached.fetch_sub(1, std::memory_order_acq_rel); + assert(old >= 1u); + if (old == 1) { + delete this; + } } - } - AS_INLINE void attachPromise() { - _promiseRef.fetch_add(1, std::memory_order_relaxed); - attachOne(); - } - AS_INLINE void detachPromise() { - auto old = _promiseRef.fetch_sub(1, std::memory_order_acq_rel); - assert(old >= 1u); - if (!hasResult() && old == 1) { - try { - throw std::runtime_error("Promise is broken"); - } catch (...) { - setResult(Try(std::current_exception())); - } + AS_INLINE void attachPromise() { + _promiseRef.fetch_add(1, std::memory_order_relaxed); + attachOne(); + } + AS_INLINE void detachPromise() { + auto old = _promiseRef.fetch_sub(1, std::memory_order_acq_rel); + assert(old >= 1u); + if (!hasResult() && old == 1) { + try { + throw std::runtime_error("Promise is broken"); + } catch (...) { + setResult(Try(std::current_exception())); + } + } + detachOne(); } - detachOne(); - } - public: - Try& getTry() noexcept { return _try_value; } - const Try& getTry() const noexcept { return _try_value; } +public: + Try& getTry() noexcept { return _try_value; } + const Try& getTry() const noexcept { return _try_value; } - void setExecutor(Executor* ex) { _executor = ex; } + void setExecutor(Executor* ex) { _executor = ex; } - Executor* getExecutor() { return _executor; } + Executor* getExecutor() { return _executor; } - void checkout() { - if (_executor) { - _context = _executor->checkout(); + void checkout() { + if (_executor) { + _context = _executor->checkout(); + } } - } - void setForceSched(bool force = true) { - if (!_executor && force) { - std::cerr << "executor is nullptr, can not set force schedule " - "continaution\n"; - return; + void setForceSched(bool force = true) { + if (!_executor && force) { + std::cerr << "executor is nullptr, can not set force schedule " + "continaution\n"; + return; + } + _forceSched = force; } - _forceSched = force; - } - - public: - // State Transfer: - // START: initial - // ONLY_RESULT: promise.setValue called - // ONLY_CONTINUATION: future.thenImpl called - void setResult(Try&& value) { + +public: + // State Transfer: + // START: initial + // ONLY_RESULT: promise.setValue called + // ONLY_CONTINUATION: future.thenImpl called + void setResult(Try&& value) { #if !defined(__GNUC__) || __GNUC__ < 12 - // GCC 12 issues a spurious uninitialized-var warning. - // See details: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=109448 - logicAssert(!hasResult(), "FutureState already has a result"); + // GCC 12 issues a spurious uninitialized-var warning. + // See details: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=109448 + logicAssert(!hasResult(), "FutureState already has a result"); #endif - _try_value = std::move(value); - - auto state = _state.load(std::memory_order_acquire); - switch (state) { - case detail::State::START: - if (_state.compare_exchange_strong(state, detail::State::ONLY_RESULT, - std::memory_order_release)) { - return; - } - // state has already transfered, fallthrough - assert(_state.load(std::memory_order_relaxed) == - detail::State::ONLY_CONTINUATION); - case detail::State::ONLY_CONTINUATION: - if (_state.compare_exchange_strong(state, detail::State::DONE, - std::memory_order_release)) { - scheduleContinuation(false); - return; + _try_value = std::move(value); + + auto state = _state.load(std::memory_order_acquire); + switch (state) { + case detail::State::START: + if (_state.compare_exchange_strong(state, + detail::State::ONLY_RESULT, + std::memory_order_release)) { + return; + } + // state has already transfered, fallthrough + assert(_state.load(std::memory_order_relaxed) == + detail::State::ONLY_CONTINUATION); + case detail::State::ONLY_CONTINUATION: + if (_state.compare_exchange_strong(state, detail::State::DONE, + std::memory_order_release)) { + scheduleContinuation(false); + return; + } + default: + logicAssert(false, "State Transfer Error"); } - default: - logicAssert(false, "State Transfer Error"); } - } - - template - void setContinuation(F&& func) { - logicAssert(!hasContinuation(), "FutureState already has a continuation"); - MoveWrapper lambdaFunc(std::move(func)); - new (&_continuation) Continuation([lambdaFunc](Try&& v) mutable { - auto& lambda = lambdaFunc.get(); - lambda(std::forward>(v)); - }); - - auto state = _state.load(std::memory_order_acquire); - switch (state) { - case detail::State::START: - if (_state.compare_exchange_strong(state, - detail::State::ONLY_CONTINUATION, - std::memory_order_release)) { - return; + + template + void setContinuation(F&& func) { + logicAssert(!hasContinuation(), + "FutureState already has a continuation"); + new (&_continuation) + Continuation([func = std::move(func)](Try&& v) mutable { + func(std::forward>(v)); + }); + + auto state = _state.load(std::memory_order_acquire); + switch (state) { + case detail::State::START: + if (_state.compare_exchange_strong( + state, detail::State::ONLY_CONTINUATION, + std::memory_order_release)) { + return; + } + // state has already transferred, fallthrough + assert(_state.load(std::memory_order_relaxed) == + detail::State::ONLY_RESULT); + case detail::State::ONLY_RESULT: + if (_state.compare_exchange_strong(state, detail::State::DONE, + std::memory_order_release)) { + scheduleContinuation(true); + return; + } + default: + logicAssert(false, "State Transfer Error"); } - // state has already transferred, fallthrough - assert(_state.load(std::memory_order_relaxed) == - detail::State::ONLY_RESULT); - case detail::State::ONLY_RESULT: - if (_state.compare_exchange_strong(state, detail::State::DONE, - std::memory_order_release)) { - scheduleContinuation(true); - return; + } + + bool currentThreadInExecutor() const { + if (!_executor) { + return false; } - default: - logicAssert(false, "State Transfer Error"); + return _executor->currentThreadInExecutor(); } - } - bool currentThreadInExecutor() const { - if (!_executor) { - return false; +private: + void scheduleContinuation(bool triggerByContinuation) { + logicAssert( + _state.load(std::memory_order_relaxed) == detail::State::DONE, + "FutureState is not DONE"); + if (!_forceSched && (!_executor || triggerByContinuation || + currentThreadInExecutor())) { + // execute inplace for better performance + ContinuationReference guard(this); + _continuation(std::move(_try_value)); + } else { + ContinuationReference guard(this); + ContinuationReference guardForException(this); + try { + bool ret; + if (Executor::NULLCTX == _context) { + ret = _executor->schedule( + [fsRef = std::move(guard)]() mutable { + auto ref = std::move(fsRef); + auto fs = ref.getFutureState(); + fs->_continuation(std::move(fs->_try_value)); + }); + } else { + ScheduleOptions opts; + opts.prompt = !_forceSched; + // schedule continuation in the same context before + // checkout() + ret = _executor->checkin( + [fsRef = std::move(guard)]() mutable { + auto ref = std::move(fsRef); + auto fs = ref.getFutureState(); + fs->_continuation(std::move(fs->_try_value)); + }, + _context, opts); + } + if (!ret) { + throw std::runtime_error( + "schedule continuation in executor failed"); + } + } catch (std::exception& e) { + // reschedule failed, execute inplace + _continuation(std::move(_try_value)); + } + } } - return _executor->currentThreadInExecutor(); - } - - private: - void scheduleContinuation(bool triggerByContinuation) { - logicAssert(_state.load(std::memory_order_relaxed) == detail::State::DONE, - "FutureState is not DONE"); - if (!_forceSched && - (!_executor || triggerByContinuation || currentThreadInExecutor())) { - // execute inplace for better performance - ContinuationReference guard(this); - _continuation(std::move(_try_value)); + + void refContinuation() { + _continuationRef.fetch_add(1, std::memory_order_relaxed); } - else { - ContinuationReference guard(this); - ContinuationReference guardForException(this); - try { - bool ret; - if (Executor::NULLCTX == _context) { - ret = _executor->schedule([fsRef = std::move(guard)]() mutable { - auto ref = std::move(fsRef); - auto fs = ref.getFutureState(); - fs->_continuation(std::move(fs->_try_value)); - }); - } - else { - ScheduleOptions opts; - opts.prompt = !_forceSched; - // schedule continuation in the same context before - // checkout() - ret = _executor->checkin( - [fsRef = std::move(guard)]() mutable { - auto ref = std::move(fsRef); - auto fs = ref.getFutureState(); - fs->_continuation(std::move(fs->_try_value)); - }, - _context, opts); + void derefContinuation() { + auto old = _continuationRef.fetch_sub(1, std::memory_order_relaxed); + assert(old >= 1); + if (old == 1) { + _continuation.~Continuation(); } - if (!ret) { - throw std::runtime_error("schedule continuation in executor failed"); - } - } catch (std::exception& e) { - // reschedule failed, execute inplace - _continuation(std::move(_try_value)); - } - } - } - - void refContinuation() { - _continuationRef.fetch_add(1, std::memory_order_relaxed); - } - void derefContinuation() { - auto old = _continuationRef.fetch_sub(1, std::memory_order_relaxed); - assert(old >= 1); - if (old == 1) { - _continuation.~Continuation(); } - } - - private: - std::atomic _state; - std::atomic _attached; - std::atomic _continuationRef; - Try _try_value; - union { - Continuation _continuation; - }; - Executor* _executor; - Executor::Context _context; - std::atomic _promiseRef; - bool _forceSched; + +private: + std::atomic _state; + std::atomic _attached; + std::atomic _continuationRef; + Try _try_value; + union { + Continuation _continuation; + }; + Executor* _executor; + Executor::Context _context; + std::atomic _promiseRef; + bool _forceSched; }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/IOExecutor.h b/include/ylt/thirdparty/async_simple/IOExecutor.h index cffdf32e4..4facbbce3 100644 --- a/include/ylt/thirdparty/async_simple/IOExecutor.h +++ b/include/ylt/thirdparty/async_simple/IOExecutor.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,29 +24,29 @@ namespace async_simple { // IOExecutor accepts and performs io requests, callers will be notified by // callback. IO type and arguments are similar to Linux AIO. enum iocb_cmd { - IOCB_CMD_PREAD = 0, - IOCB_CMD_PWRITE = 1, - IOCB_CMD_FSYNC = 2, - IOCB_CMD_FDSYNC = 3, - /* These two are experimental. - * IOCB_CMD_PREADX = 4, - * IOCB_CMD_POLL = 5, - */ - IOCB_CMD_NOOP = 6, - IOCB_CMD_PREADV = 7, - IOCB_CMD_PWRITEV = 8, + IOCB_CMD_PREAD = 0, + IOCB_CMD_PWRITE = 1, + IOCB_CMD_FSYNC = 2, + IOCB_CMD_FDSYNC = 3, + /* These two are experimental. + * IOCB_CMD_PREADX = 4, + * IOCB_CMD_POLL = 5, + */ + IOCB_CMD_NOOP = 6, + IOCB_CMD_PREADV = 7, + IOCB_CMD_PWRITEV = 8, }; struct io_event_t { - void* data; - void* obj; - uint64_t res; - uint64_t res2; + void* data; + void* obj; + uint64_t res; + uint64_t res2; }; struct iovec_t { - void* iov_base; - size_t iov_len; + void* iov_base; + size_t iov_len; }; using AIOCallback = std::function; @@ -55,20 +55,20 @@ using AIOCallback = std::function; // After the user implements an IOExecutor, he should associate // the IOExecutor with the corresponding Executor implementation. class IOExecutor { - public: - using Func = std::function; +public: + using Func = std::function; - IOExecutor() {} - virtual ~IOExecutor() {} + IOExecutor() {} + virtual ~IOExecutor() {} - IOExecutor(const IOExecutor&) = delete; - IOExecutor& operator=(const IOExecutor&) = delete; + IOExecutor(const IOExecutor&) = delete; + IOExecutor& operator=(const IOExecutor&) = delete; - public: - virtual void submitIO(int fd, iocb_cmd cmd, void* buffer, size_t length, - off_t offset, AIOCallback cbfn) = 0; - virtual void submitIOV(int fd, iocb_cmd cmd, const iovec_t* iov, size_t count, - off_t offset, AIOCallback cbfn) = 0; +public: + virtual void submitIO(int fd, iocb_cmd cmd, void* buffer, size_t length, + off_t offset, AIOCallback cbfn) = 0; + virtual void submitIOV(int fd, iocb_cmd cmd, const iovec_t* iov, + size_t count, off_t offset, AIOCallback cbfn) = 0; }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/LocalState.h b/include/ylt/thirdparty/async_simple/LocalState.h index be916e01a..297de380c 100644 --- a/include/ylt/thirdparty/async_simple/LocalState.h +++ b/include/ylt/thirdparty/async_simple/LocalState.h @@ -1,6 +1,6 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,16 +18,15 @@ #define ASYNC_SIMPLE_LOCALSTATE_H #include + #include #include #include #include #include #include - #include "async_simple/Common.h" #include "async_simple/Executor.h" -#include "async_simple/MoveWrapper.h" #include "async_simple/Try.h" namespace async_simple { @@ -39,57 +38,57 @@ namespace async_simple { // Users should never use LocalState directly. template class LocalState { - private: - using Continuation = std::function&& value)>; - - public: - LocalState() : _executor(nullptr) {} - LocalState(T&& v) : _try_value(std::forward(v)), _executor(nullptr) {} - LocalState(Try&& t) : _try_value(std::move(t)), _executor(nullptr) {} - - ~LocalState() {} - - LocalState(const LocalState&) = delete; - LocalState& operator=(const LocalState&) = delete; - - LocalState(LocalState&& other) - : _try_value(std::move(other._try_value)), - _executor(std::exchange(other._executor, nullptr)) {} - LocalState& operator=(LocalState&& other) { - if (this != &other) { - std::swap(_try_value, other._try_value); - std::swap(_executor, other._executor); +private: + using Continuation = std::function&& value)>; + +public: + LocalState() : _executor(nullptr) {} + LocalState(T&& v) : _try_value(std::forward(v)), _executor(nullptr) {} + LocalState(Try&& t) : _try_value(std::move(t)), _executor(nullptr) {} + + ~LocalState() {} + + LocalState(const LocalState&) = delete; + LocalState& operator=(const LocalState&) = delete; + + LocalState(LocalState&& other) + : _try_value(std::move(other._try_value)), + _executor(std::exchange(other._executor, nullptr)) {} + LocalState& operator=(LocalState&& other) { + if (this != &other) { + std::swap(_try_value, other._try_value); + std::swap(_executor, other._executor); + } + return *this; } - return *this; - } - public: - bool hasResult() const noexcept { return _try_value.available(); } +public: + bool hasResult() const noexcept { return _try_value.available(); } - public: - Try& getTry() noexcept { return _try_value; } - const Try& getTry() const noexcept { return _try_value; } +public: + Try& getTry() noexcept { return _try_value; } + const Try& getTry() const noexcept { return _try_value; } - void setExecutor(Executor* ex) { _executor = ex; } + void setExecutor(Executor* ex) { _executor = ex; } - Executor* getExecutor() { return _executor; } + Executor* getExecutor() { return _executor; } - bool currentThreadInExecutor() const { - if (!_executor) { - return false; + bool currentThreadInExecutor() const { + if (!_executor) { + return false; + } + return _executor->currentThreadInExecutor(); } - return _executor->currentThreadInExecutor(); - } - - template - void setContinuation(F&& f) { - assert(_try_value.available()); - std::forward(f)(std::move(_try_value)); - } - - private: - Try _try_value; - Executor* _executor; + + template + void setContinuation(F&& f) { + assert(_try_value.available()); + std::forward(f)(std::move(_try_value)); + } + +private: + Try _try_value; + Executor* _executor; }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/MoveWrapper.h b/include/ylt/thirdparty/async_simple/MoveWrapper.h index 698039a3d..d75c9c1f1 100644 --- a/include/ylt/thirdparty/async_simple/MoveWrapper.h +++ b/include/ylt/thirdparty/async_simple/MoveWrapper.h @@ -25,7 +25,7 @@ namespace async_simple { // std::function requre copyConstructable, hence we provide MoveWrapper perform // copy as move. template -class MoveWrapper { +class [[deprecated]] MoveWrapper { public: MoveWrapper() = default; MoveWrapper(T&& value) : _value(std::move(value)) {} diff --git a/include/ylt/thirdparty/async_simple/Promise.h b/include/ylt/thirdparty/async_simple/Promise.h index 216eae557..083e840ca 100644 --- a/include/ylt/thirdparty/async_simple/Promise.h +++ b/include/ylt/thirdparty/async_simple/Promise.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ #define ASYNC_SIMPLE_PROMISE_H #include - #include "async_simple/Common.h" #include "async_simple/Future.h" @@ -34,87 +33,88 @@ class Future; // we could call setException(). template class Promise { - public: - using value_type = std::conditional_t, Unit, T>; - Promise() : _sharedState(new FutureState()), _hasFuture(false) { - _sharedState->attachPromise(); - } - ~Promise() { - if (_sharedState) { - _sharedState->detachPromise(); +public: + using value_type = std::conditional_t, Unit, T>; + Promise() : _sharedState(new FutureState()), _hasFuture(false) { + _sharedState->attachPromise(); + } + ~Promise() { + if (_sharedState) { + _sharedState->detachPromise(); + } } - } - Promise(const Promise& other) { - _sharedState = other._sharedState; - _hasFuture = other._hasFuture; - _sharedState->attachPromise(); - } - Promise& operator=(const Promise& other) { - if (this == &other) { - return *this; + Promise(const Promise& other) { + _sharedState = other._sharedState; + _hasFuture = other._hasFuture; + _sharedState->attachPromise(); + } + Promise& operator=(const Promise& other) { + if (this == &other) { + return *this; + } + this->~Promise(); + _sharedState = other._sharedState; + _hasFuture = other._hasFuture; + _sharedState->attachPromise(); + return *this; } - this->~Promise(); - _sharedState = other._sharedState; - _hasFuture = other._hasFuture; - _sharedState->attachPromise(); - return *this; - } - Promise(Promise&& other) - : _sharedState(other._sharedState), _hasFuture(other._hasFuture) { - other._sharedState = nullptr; - other._hasFuture = false; - } - Promise& operator=(Promise&& other) { - std::swap(_sharedState, other._sharedState); - std::swap(_hasFuture, other._hasFuture); - return *this; - } + Promise(Promise&& other) + : _sharedState(other._sharedState), _hasFuture(other._hasFuture) { + other._sharedState = nullptr; + other._hasFuture = false; + } + Promise& operator=(Promise&& other) { + std::swap(_sharedState, other._sharedState); + std::swap(_hasFuture, other._hasFuture); + return *this; + } - public: - Future getFuture() { - logicAssert(valid(), "Promise is broken"); - logicAssert(!_hasFuture, "Promise already has a future"); - _hasFuture = true; - return Future(_sharedState); - } - bool valid() const { return _sharedState != nullptr; } - // make the continuation back to origin context - Promise& checkout() { - if (_sharedState) { - _sharedState->checkout(); +public: + Future getFuture() { + logicAssert(valid(), "Promise is broken"); + logicAssert(!_hasFuture, "Promise already has a future"); + _hasFuture = true; + return Future(_sharedState); } - return *this; - } - Promise& forceSched() { - if (_sharedState) { - _sharedState->setForceSched(); + bool valid() const { return _sharedState != nullptr; } + // make the continuation back to origin context + Promise& checkout() { + if (_sharedState) { + _sharedState->checkout(); + } + return *this; + } + Promise& forceSched() { + if (_sharedState) { + _sharedState->setForceSched(); + } + return *this; } - return *this; - } - public: - void setException(std::exception_ptr error) { - logicAssert(valid(), "Promise is broken"); - _sharedState->setResult(Try(error)); - } - void setValue(value_type&& v) requires(!std::is_void_v) { - logicAssert(valid(), "Promise is broken"); - _sharedState->setResult(Try(std::forward(v))); - } - void setValue(Try&& t) { - logicAssert(valid(), "Promise is broken"); - _sharedState->setResult(std::move(t)); - } +public: + void setException(std::exception_ptr error) { + logicAssert(valid(), "Promise is broken"); + _sharedState->setResult(Try(error)); + } + void setValue(value_type&& v) requires(!std::is_void_v) { + logicAssert(valid(), "Promise is broken"); + _sharedState->setResult(Try(std::forward(v))); + } + void setValue(Try&& t) { + logicAssert(valid(), "Promise is broken"); + _sharedState->setResult(std::move(t)); + } - void setValue() requires(std::is_void_v) { - logicAssert(valid(), "Promise is broken"); - _sharedState->setResult(Try(Unit())); - } + void setValue() requires(std::is_void_v) { + logicAssert(valid(), "Promise is broken"); + _sharedState->setResult(Try(Unit())); + } - private : FutureState* _sharedState = nullptr; - bool _hasFuture = false; +private: + FutureState* _sharedState = nullptr; + bool _hasFuture = false; }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/Traits.h b/include/ylt/thirdparty/async_simple/Traits.h index 15322fbf7..60b32a09a 100644 --- a/include/ylt/thirdparty/async_simple/Traits.h +++ b/include/ylt/thirdparty/async_simple/Traits.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ #define ASYNC_SIMPLE_TRAITS_H #include - #include "async_simple/Common.h" #include "async_simple/Try.h" #include "async_simple/Unit.h" @@ -29,40 +28,40 @@ class Future; template struct IsFuture : std::false_type { - using Inner = T; + using Inner = T; }; template struct IsFuture> : std::true_type { - using Inner = T; + using Inner = T; }; template struct TryCallableResult { - using Result = std::invoke_result_t&&>; - using ReturnsFuture = IsFuture; - static constexpr bool isTry = true; + using Result = std::invoke_result_t&&>; + using ReturnsFuture = IsFuture; + static constexpr bool isTry = true; }; template struct ValueCallableResult { - using Result = std::invoke_result_t; - using ReturnsFuture = IsFuture; - static constexpr bool isTry = false; + using Result = std::invoke_result_t; + using ReturnsFuture = IsFuture; + static constexpr bool isTry = false; }; template struct ValueCallableResult { - using Result = std::invoke_result_t; - using ReturnsFuture = IsFuture; - static constexpr bool isTry = false; + using Result = std::invoke_result_t; + using ReturnsFuture = IsFuture; + static constexpr bool isTry = false; }; namespace detail { template struct remove_cvref { - using type = - typename std::remove_cv::type>::type; + using type = + typename std::remove_cv::type>::type; }; template using remove_cvref_t = typename remove_cvref::type; diff --git a/include/ylt/thirdparty/async_simple/Try.h b/include/ylt/thirdparty/async_simple/Try.h index 6fa2be339..8ec401893 100644 --- a/include/ylt/thirdparty/async_simple/Try.h +++ b/include/ylt/thirdparty/async_simple/Try.h @@ -118,7 +118,7 @@ class Try { } std::exception_ptr getException() const { logicAssert(std::holds_alternative(_value), - "Try object do not has on error"); + "Try object do not has an error"); return std::get(_value); } diff --git a/include/ylt/thirdparty/async_simple/Unit.h b/include/ylt/thirdparty/async_simple/Unit.h index f623c9eef..4217282d9 100644 --- a/include/ylt/thirdparty/async_simple/Unit.h +++ b/include/ylt/thirdparty/async_simple/Unit.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ #define ASYNC_SIMPLE_UNIT_H #include - #include "async_simple/Common.h" #include "async_simple/Try.h" @@ -28,8 +27,8 @@ namespace async_simple { // // User shouldn't use this directly. struct Unit { - constexpr bool operator==(const Unit&) const { return true; } - constexpr bool operator!=(const Unit&) const { return false; } + constexpr bool operator==(const Unit&) const { return true; } + constexpr bool operator!=(const Unit&) const { return false; } }; } // namespace async_simple diff --git a/include/ylt/thirdparty/async_simple/coro/Collect.h b/include/ylt/thirdparty/async_simple/coro/Collect.h index c0db35d38..218cd7b43 100644 --- a/include/ylt/thirdparty/async_simple/coro/Collect.h +++ b/include/ylt/thirdparty/async_simple/coro/Collect.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Alibaba Group Holding Limited; + * Copyright (c) 2022, Alibaba Group Holding Limited; * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,9 +24,9 @@ #include #include #include - #include "async_simple/Common.h" #include "async_simple/Try.h" +#include "async_simple/Unit.h" #include "async_simple/coro/CountEvent.h" #include "async_simple/coro/Lazy.h" #include "async_simple/experimental/coroutine.h" @@ -47,273 +47,404 @@ namespace detail { template struct CollectAnyResult { - CollectAnyResult() : _idx(static_cast(-1)), _value() {} - CollectAnyResult(size_t idx, std::add_rvalue_reference_t value) requires( - !std::is_void_v) - : _idx(idx), _value(std::move(value)) {} - - CollectAnyResult(const CollectAnyResult&) = delete; - CollectAnyResult& operator=(const CollectAnyResult&) = delete; - CollectAnyResult(CollectAnyResult&& other) - : _idx(std::move(other._idx)), _value(std::move(other._value)) { - other._idx = static_cast(-1); - } - - size_t _idx; - Try _value; - - size_t index() const { return _idx; } - - bool hasError() const { return _value.hasError(); } - // Require hasError() == true. Otherwise it is UB to call - // this method. - std::exception_ptr getException() const { return _value.getException(); } - - // Require hasError() == false. Otherwise it is UB to call - // value() method. + CollectAnyResult() : _idx(static_cast(-1)), _value() {} + CollectAnyResult(size_t idx, std::add_rvalue_reference_t value) requires( + !std::is_void_v) + : _idx(idx), _value(std::move(value)) {} + + CollectAnyResult(const CollectAnyResult&) = delete; + CollectAnyResult& operator=(const CollectAnyResult&) = delete; + CollectAnyResult(CollectAnyResult&& other) + : _idx(std::move(other._idx)), _value(std::move(other._value)) { + other._idx = static_cast(-1); + } + + size_t _idx; + Try _value; + + size_t index() const { return _idx; } + + bool hasError() const { return _value.hasError(); } + // Require hasError() == true. Otherwise it is UB to call + // this method. + std::exception_ptr getException() const { return _value.getException(); } + + // Require hasError() == false. Otherwise it is UB to call + // value() method. #if __cpp_explicit_this_parameter >= 202110L - template - auto&& value(this Self&& self) { - return std::forward(self)._value.value(); - } + template + auto&& value(this Self&& self) { + return std::forward(self)._value.value(); + } #else - const T& value() const& { return _value.value(); } - T& value() & { return _value.value(); } - T&& value() && { return std::move(_value).value(); } - const T&& value() const&& { return std::move(_value).value(); } + const T& value() const& { return _value.value(); } + T& value() & { return _value.value(); } + T&& value() && { return std::move(_value).value(); } + const T&& value() const&& { return std::move(_value).value(); } #endif }; -template +template struct CollectAnyAwaiter { - using ValueType = typename LazyType::ValueType; - using ResultType = CollectAnyResult; - - CollectAnyAwaiter(std::vector&& input) - : _input(std::move(input)), _result(nullptr) {} - - CollectAnyAwaiter(const CollectAnyAwaiter&) = delete; - CollectAnyAwaiter& operator=(const CollectAnyAwaiter&) = delete; - CollectAnyAwaiter(CollectAnyAwaiter&& other) - : _input(std::move(other._input)), _result(std::move(other._result)) {} - - bool await_ready() const noexcept { - return _input.empty() || - (_result && _result->_idx != static_cast(-1)); - } - - void await_suspend(std::coroutine_handle<> continuation) { - auto promise_type = std::coroutine_handle::from_address( - continuation.address()) - .promise(); - auto executor = promise_type._executor; - // we should take care of input's life-time after resume. - std::vector input(std::move(_input)); - // Make local copies to shared_ptr to avoid deleting objects too early - // if any coroutine finishes before this function. - auto result = std::make_shared(); - auto event = std::make_shared(input.size()); - - _result = result; - for (size_t i = 0; - i < input.size() && (result->_idx == static_cast(-1)); ++i) { - if (!input[i]._coro.promise()._executor) { - input[i]._coro.promise()._executor = executor; - } - - input[i].start([i = i, size = input.size(), r = result, c = continuation, - e = event](Try&& result) mutable { - assert(e != nullptr); - auto count = e->downCount(); - if (count == size + 1) { - r->_idx = i; - r->_value = std::move(result); - c.resume(); + using ValueType = typename LazyType::ValueType; + using ResultType = CollectAnyResult; + + CollectAnyAwaiter(std::vector&& input) + : _input(std::move(input)), _result(nullptr) {} + + CollectAnyAwaiter(std::vector&& input, Callback callback) + : _input(std::move(input)), + _result(nullptr), + _callback(std::move(callback)) {} + + CollectAnyAwaiter(const CollectAnyAwaiter&) = delete; + CollectAnyAwaiter& operator=(const CollectAnyAwaiter&) = delete; + CollectAnyAwaiter(CollectAnyAwaiter&& other) + : _input(std::move(other._input)), + _result(std::move(other._result)), + _callback(std::move(other._callback)) {} + + bool await_ready() const noexcept { + return _input.empty() || + (_result && _result->_idx != static_cast(-1)); + } + + void await_suspend(std::coroutine_handle<> continuation) { + auto promise_type = + std::coroutine_handle::from_address( + continuation.address()) + .promise(); + auto executor = promise_type._executor; + // we should take care of input's life-time after resume. + std::vector input(std::move(_input)); + // Make local copies to shared_ptr to avoid deleting objects too early + // if any coroutine finishes before this function. + auto result = std::make_shared(); + auto event = std::make_shared(input.size()); + auto callback = std::move(_callback); + + _result = result; + for (size_t i = 0; + i < input.size() && (result->_idx == static_cast(-1)); + ++i) { + if (!input[i]._coro.promise()._executor) { + input[i]._coro.promise()._executor = executor; + } + + if constexpr (std::is_same_v) { + (void)callback; + input[i].start([i, size = input.size(), r = result, + c = continuation, + e = event](Try&& result) mutable { + assert(e != nullptr); + auto count = e->downCount(); + if (count == size + 1) { + r->_idx = i; + r->_value = std::move(result); + c.resume(); + } + }); + } else { + input[i].start([i, size = input.size(), r = result, + c = continuation, e = event, + callback](Try&& result) mutable { + assert(e != nullptr); + auto count = e->downCount(); + if (count == size + 1) { + r->_idx = i; + (*callback)(i, std::move(result)); + c.resume(); + } + }); + } + } // end for + } + auto await_resume() { + if constexpr (std::is_same_v) { + assert(_result != nullptr); + return std::move(*_result); + } else { + return _result->index(); } - }); - } // end for - } - auto await_resume() { - assert(_result != nullptr); - return std::move(*_result); - } - - std::vector _input; - std::shared_ptr _result; + } + + std::vector _input; + std::shared_ptr _result; + [[no_unique_address]] Callback _callback; +}; + +template +struct CollectAnyVariadicPairAwaiter { + using InputType = std::tuple; + + CollectAnyVariadicPairAwaiter(Ts&&... inputs) + : _input(std::move(inputs)...), _result(nullptr) {} + + CollectAnyVariadicPairAwaiter(InputType&& inputs) + : _input(std::move(inputs)), _result(nullptr) {} + + CollectAnyVariadicPairAwaiter(const CollectAnyVariadicPairAwaiter&) = + delete; + CollectAnyVariadicPairAwaiter& operator=( + const CollectAnyVariadicPairAwaiter&) = delete; + CollectAnyVariadicPairAwaiter(CollectAnyVariadicPairAwaiter&& other) + : _input(std::move(other._input)), _result(std::move(other._result)) {} + + bool await_ready() const noexcept { + return _result && _result->has_value(); + } + + void await_suspend(std::coroutine_handle<> continuation) { + auto promise_type = + std::coroutine_handle::from_address( + continuation.address()) + .promise(); + auto executor = promise_type._executor; + auto event = + std::make_shared(std::tuple_size()); + auto result = std::make_shared>(); + _result = result; + + auto input = std::move(_input); + + [&](std::index_sequence) { + ( + [&](auto& lazy, auto& callback) { + if (result->has_value()) { + return; + } + + if (!lazy._coro.promise()._executor) { + lazy._coro.promise()._executor = executor; + } + + lazy.start([result, event, continuation, + callback](auto&& res) mutable { + auto count = event->downCount(); + if (count == std::tuple_size() + 1) { + callback(std::move(res)); + *result = I; + continuation.resume(); + } + }); + }(std::get<0>(std::get(input)), + std::get<1>(std::get(input))), + ...); + } + (std::make_index_sequence()); + } + + auto await_resume() { + assert(_result != nullptr); + return std::move(_result->value()); + } + + std::tuple _input; + std::shared_ptr> _result; +}; + +template +struct SimpleCollectAnyVariadicPairAwaiter { + using InputType = std::tuple; + + InputType _inputs; + + SimpleCollectAnyVariadicPairAwaiter(Ts&&... inputs) + : _inputs(std::move(inputs)...) {} + + auto coAwait(Executor* ex) { + return CollectAnyVariadicPairAwaiter(std::move(_inputs)); + } }; template