Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Mar 16, 2024
2 parents fa7af6c + 5f627b7 commit b0e5dfd
Show file tree
Hide file tree
Showing 49 changed files with 3,613 additions and 2,682 deletions.
7 changes: 6 additions & 1 deletion cmake/develop.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ option(CORO_RPC_USE_OTHER_RPC "coro_rpc extend to support other rpc" OFF)
message(STATUS "CORO_RPC_USE_OTHER_RPC: ${CORO_RPC_USE_OTHER_RPC}")

# Enable address sanitizer
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" OFF)
else()
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON)
endif()

if(ENABLE_SANITIZER AND NOT MSVC)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
check_asan(HAS_ASAN)
Expand Down
34 changes: 29 additions & 5 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include <async_simple/Executor.h>
#include <async_simple/coro/Collect.h>
#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/Sleep.h>
#include <async_simple/coro/SyncAwait.h>
Expand Down Expand Up @@ -349,8 +350,24 @@ post(Func func,
co_return co_await awaitor.await_resume(helper);
}

template <typename R>
struct coro_channel
: public asio::experimental::channel<void(std::error_code, R)> {
using return_type = R;
using ValueType = std::pair<std::error_code, R>;
using asio::experimental::channel<void(std::error_code, R)>::channel;
};

template <typename R>
inline coro_channel<R> create_channel(
size_t capacity,
asio::io_context::executor_type executor =
coro_io::get_global_block_executor()->get_asio_executor()) {
return coro_channel<R>(executor, capacity);
}

template <typename T>
async_simple::coro::Lazy<std::error_code> async_send(
inline async_simple::coro::Lazy<std::error_code> async_send(
asio::experimental::channel<void(std::error_code, T)> &channel, T val) {
callback_awaitor<std::error_code> awaitor;
co_return co_await awaitor.await_resume(
Expand All @@ -361,17 +378,24 @@ async_simple::coro::Lazy<std::error_code> async_send(
});
}

template <typename R>
async_simple::coro::Lazy<std::pair<std::error_code, R>> async_receive(
asio::experimental::channel<void(std::error_code, R)> &channel) {
callback_awaitor<std::pair<std::error_code, R>> awaitor;
template <typename Channel>
async_simple::coro::Lazy<std::pair<
std::error_code,
typename Channel::return_type>> inline async_receive(Channel &channel) {
callback_awaitor<std::pair<std::error_code, typename Channel::return_type>>
awaitor;
co_return co_await awaitor.await_resume([&](auto handler) {
channel.async_receive([handler](auto ec, auto val) {
handler.set_value_then_resume(std::make_pair(ec, std::move(val)));
});
});
}

template <typename... T>
auto select(T &&...args) {
return async_simple::coro::collectAny(std::forward<T>(args)...);
}

template <typename Socket, typename AsioBuffer>
std::pair<asio::error_code, size_t> read_some(Socket &sock,
AsioBuffer &&buffer) {
Expand Down
73 changes: 0 additions & 73 deletions include/ylt/thirdparty/async_simple/CMakeLists.txt

This file was deleted.

75 changes: 37 additions & 38 deletions include/ylt/thirdparty/async_simple/Collect.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
* Copyright (c) 2022, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,14 +17,14 @@
#define ASYNC_SIMPLE_COLLECT_H

#include <exception>
#include <iostream>
#include <iterator>
#include <vector>

#include "async_simple/Common.h"
#include "async_simple/Future.h"
#include "async_simple/Try.h"

#include <iostream>

namespace async_simple {

// collectAll - collect all the values for a range of futures.
Expand All @@ -50,48 +50,47 @@ template <std::input_iterator Iterator>
inline Future<std::vector<
Try<typename std::iterator_traits<Iterator>::value_type::value_type>>>
collectAll(Iterator begin, Iterator end) {
using T = typename std::iterator_traits<Iterator>::value_type::value_type;
size_t n = std::distance(begin, end);
using T = typename std::iterator_traits<Iterator>::value_type::value_type;
size_t n = std::distance(begin, end);

bool allReady = true;
for (auto iter = begin; iter != end; ++iter) {
if (!iter->hasResult()) {
allReady = false;
break;
}
}
if (allReady) {
std::vector<Try<T>> results;
results.reserve(n);
bool allReady = true;
for (auto iter = begin; iter != end; ++iter) {
results.push_back(std::move(iter->result()));
if (!iter->hasResult()) {
allReady = false;
break;
}
}
if (allReady) {
std::vector<Try<T>> results;
results.reserve(n);
for (auto iter = begin; iter != end; ++iter) {
results.push_back(std::move(iter->result()));
}
return Future<std::vector<Try<T>>>(std::move(results));
}
return Future<std::vector<Try<T>>>(std::move(results));
}

Promise<std::vector<Try<T>>> promise;
auto future = promise.getFuture();
Promise<std::vector<Try<T>>> promise;
auto future = promise.getFuture();

struct Context {
Context(size_t n, Promise<std::vector<Try<T>>> p_)
: results(n), p(std::move(p_)) {}
~Context() { p.setValue(std::move(results)); }
std::vector<Try<T>> results;
Promise<std::vector<Try<T>>> p;
};
struct Context {
Context(size_t n, Promise<std::vector<Try<T>>> p_)
: results(n), p(std::move(p_)) {}
~Context() { p.setValue(std::move(results)); }
std::vector<Try<T>> results;
Promise<std::vector<Try<T>>> p;
};

auto ctx = std::make_shared<Context>(n, std::move(promise));
for (size_t i = 0; i < n; ++i, ++begin) {
if (begin->hasResult()) {
ctx->results[i] = std::move(begin->result());
}
else {
begin->setContinuation([ctx, i](Try<T>&& t) mutable {
ctx->results[i] = std::move(t);
});
auto ctx = std::make_shared<Context>(n, std::move(promise));
for (size_t i = 0; i < n; ++i, ++begin) {
if (begin->hasResult()) {
ctx->results[i] = std::move(begin->result());
} else {
begin->setContinuation([ctx, i](Try<T>&& t) mutable {
ctx->results[i] = std::move(t);
});
}
}
}
return future;
return future;
}

} // namespace async_simple
Expand Down
22 changes: 18 additions & 4 deletions include/ylt/thirdparty/async_simple/Common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
* Copyright (c) 2022, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,20 @@
#endif // __SANITIZE_ADDRESS__
#endif // __GNUC__

#if defined(__alibaba_clang__) && \
__has_cpp_attribute(ACC::coro_only_destroy_when_complete)
#define CORO_ONLY_DESTROY_WHEN_DONE [[ACC::coro_only_destroy_when_complete]]
#else
#define CORO_ONLY_DESTROY_WHEN_DONE
#endif

#if defined(__alibaba_clang__) && \
__has_cpp_attribute(ACC::elideable_after_await)
#define ELIDEABLE_AFTER_AWAIT [[ACC::elideable_after_await]]
#else
#define ELIDEABLE_AFTER_AWAIT
#endif

namespace async_simple {
// Different from assert, logicAssert is meaningful in
// release mode. logicAssert should be used in case that
Expand All @@ -52,9 +66,9 @@ namespace async_simple {
// a bug in the library. If logicAssert fails, it means
// there is a bug in the user code.
inline void logicAssert(bool x, const char* errorMsg) {
if (x)
AS_LIKELY { return; }
throw std::logic_error(errorMsg);
if (x)
AS_LIKELY { return; }
throw std::logic_error(errorMsg);
}

} // namespace async_simple
Expand Down
Loading

0 comments on commit b0e5dfd

Please sign in to comment.