Skip to content

Commit

Permalink
Merge branch 'main' into rpc_logic_simply
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Apr 2, 2024
2 parents 90e8b81 + 09aefcb commit acafe3d
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 55 deletions.
6 changes: 1 addition & 5 deletions cmake/develop.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ option(CORO_RPC_USE_OTHER_RPC "coro_rpc extend to support other rpc" OFF)
message(STATUS "CORO_RPC_USE_OTHER_RPC: ${CORO_RPC_USE_OTHER_RPC}")

# Enable address sanitizer
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" OFF)
else()
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON)
endif()
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON)

if(ENABLE_SANITIZER AND NOT MSVC)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down
63 changes: 61 additions & 2 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@

namespace coro_io {

template <typename T>
constexpr inline bool is_lazy_v =
util::is_specialization_v<std::remove_cvref_t<T>, async_simple::coro::Lazy>;

template <typename Arg, typename Derived>
class callback_awaitor_base {
private:
Expand Down Expand Up @@ -395,9 +399,64 @@ async_simple::coro::Lazy<std::pair<
});
}

template <typename T>
inline decltype(auto) select_impl(T &pair) {
using Func = std::tuple_element_t<1, std::remove_cvref_t<T>>;
using ValueType =
typename std::tuple_element_t<0, std::remove_cvref_t<T>>::ValueType;
using return_type = std::invoke_result_t<Func, async_simple::Try<ValueType>>;

auto &callback = std::get<1>(pair);
if constexpr (coro_io::is_lazy_v<return_type>) {
auto executor = std::get<0>(pair).getExecutor();
return std::make_pair(
std::move(std::get<0>(pair)),
[executor, callback = std::move(callback)](auto &&val) {
if (executor) {
callback(std::move(val)).via(executor).start([](auto &&) {
});
}
else {
callback(std::move(val)).start([](auto &&) {
});
}
});
}
else {
return pair;
}
}

template <typename... T>
auto select(T &&...args) {
return async_simple::coro::collectAny(std::forward<T>(args)...);
inline auto select(T &&...args) {
return async_simple::coro::collectAny(select_impl(args)...);
}

template <typename T, typename Callback>
inline auto select(std::vector<T> vec, Callback callback) {
if constexpr (coro_io::is_lazy_v<Callback>) {
std::vector<async_simple::Executor *> executors;
for (auto &lazy : vec) {
executors.push_back(lazy.getExecutor());
}

return async_simple::coro::collectAny(
std::move(vec),
[executors, callback = std::move(callback)](size_t index, auto &&val) {
auto executor = executors[index];
if (executor) {
callback(index, std::move(val)).via(executor).start([](auto &&) {
});
}
else {
callback(index, std::move(val)).start([](auto &&) {
});
}
});
}
else {
return async_simple::coro::collectAny(std::move(vec), std::move(callback));
}
}

template <typename Socket, typename AsioBuffer>
Expand Down
90 changes: 76 additions & 14 deletions include/ylt/coro_rpc/impl/coro_rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <asio/error_code.hpp>
#include <asio/io_context.hpp>
#include <atomic>
#include <charconv>
#include <condition_variable>
#include <cstdint>
#include <exception>
Expand Down Expand Up @@ -71,20 +72,36 @@ class coro_rpc_server_base {
* default no timeout.
*/
coro_rpc_server_base(size_t thread_num, unsigned short port,
std::string address = "0.0.0.0",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(port),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {}
flag_{stat::init} {
init_address(std::move(address));
}

coro_rpc_server_base(size_t thread_num,
std::string address /* = "0.0.0.0:9001" */,
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
init_address(std::move(address));
}

coro_rpc_server_base(const server_config &config = server_config{})
: pool_(config.thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(config.port),
conn_timeout_duration_(config.conn_timeout_duration),
flag_{stat::init} {}
flag_{stat::init} {
init_address(config.address);
}

~coro_rpc_server_base() {
ELOGV(INFO, "coro_rpc_server will quit");
Expand Down Expand Up @@ -118,7 +135,6 @@ class coro_rpc_server_base {
[[nodiscard]] coro_rpc::expected<async_simple::Future<coro_rpc::err_code>,
coro_rpc::err_code>
async_start() noexcept {
coro_rpc::err_code ec{};
{
std::unique_lock lock(start_mtx_);
if (flag_ != stat::init) {
Expand All @@ -131,8 +147,8 @@ class coro_rpc_server_base {
return coro_rpc::unexpected<coro_rpc::err_code>{
coro_rpc::errc::server_has_ran};
}
ec = listen();
if (!ec) {
errc_ = listen();
if (!errc_) {
if constexpr (requires(typename server_config::executor_pool_t & pool) {
pool.run();
}) {
Expand All @@ -146,12 +162,13 @@ class coro_rpc_server_base {
flag_ = stat::stop;
}
}
if (!ec) {
if (!errc_) {
async_simple::Promise<coro_rpc::err_code> promise;
auto future = promise.getFuture();
accept().start([p = std::move(promise)](auto &&res) mutable {
accept().start([this, p = std::move(promise)](auto &&res) mutable {
if (res.hasError()) {
p.setValue(coro_rpc::err_code{coro_rpc::errc::io_error});
errc_ = coro_rpc::err_code{coro_rpc::errc::io_error};
p.setValue(errc_);
}
else {
p.setValue(res.value());
Expand All @@ -160,7 +177,7 @@ class coro_rpc_server_base {
return std::move(future);
}
else {
return coro_rpc::unexpected<coro_rpc::err_code>{ec};
return coro_rpc::unexpected<coro_rpc::err_code>{errc_};
}
}

Expand Down Expand Up @@ -207,6 +224,8 @@ class coro_rpc_server_base {
* @return
*/
uint16_t port() const { return port_; };
std::string_view address() const { return address_; }
coro_rpc::err_code get_errc() const { return errc_; }

/*!
* Register RPC service functions (member function)
Expand Down Expand Up @@ -288,12 +307,27 @@ class coro_rpc_server_base {
coro_rpc::err_code listen() {
ELOGV(INFO, "begin to listen");
using asio::ip::tcp;
auto endpoint = tcp::endpoint(tcp::v4(), port_);
acceptor_.open(endpoint.protocol());
asio::error_code ec;
asio::ip::tcp::resolver::query query(address_, std::to_string(port_));
asio::ip::tcp::resolver resolver(acceptor_.get_executor());
asio::ip::tcp::resolver::iterator it = resolver.resolve(query, ec);

asio::ip::tcp::resolver::iterator it_end;
if (ec || it == it_end) {
ELOGV(ERROR, "resolve address %s error : %s", address_.data(),
ec.message().data());
return coro_rpc::errc::bad_address;
}

auto endpoint = it->endpoint();
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
ELOGV(ERROR, "open failed, error : %s", ec.message().data());
return coro_rpc::errc::open_error;
}
#ifdef __GNUC__
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.set_option(tcp::acceptor::reuse_address(true), ec);
#endif
asio::error_code ec;
acceptor_.bind(endpoint, ec);
if (ec) {
ELOGV(ERROR, "bind port %d error : %s", port_.load(),
Expand All @@ -305,7 +339,14 @@ class coro_rpc_server_base {
#ifdef _MSC_VER
acceptor_.set_option(tcp::acceptor::reuse_address(true));
#endif
acceptor_.listen();
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
if (ec) {
ELOGV(ERROR, "port %d listen error : %s", port_.load(),
ec.message().data());
acceptor_.cancel(ec);
acceptor_.close(ec);
return coro_rpc::errc::listen_error;
}

auto end_point = acceptor_.local_endpoint(ec);
if (ec) {
Expand Down Expand Up @@ -383,6 +424,25 @@ class coro_rpc_server_base {
acceptor_close_waiter_.get_future().wait();
}

void init_address(std::string address) {
if (size_t pos = address.find(':'); pos != std::string::npos) {
auto port_sv = std::string_view(address).substr(pos + 1);

uint16_t port;
auto [ptr, ec] = std::from_chars(
port_sv.data(), port_sv.data() + port_sv.size(), port, 10);
if (ec != std::errc{}) {
address_ = std::move(address);
return;
}

port_ = port;
address = address.substr(0, pos);
}

address_ = std::move(address);
}

typename server_config::executor_pool_t pool_;
asio::ip::tcp::acceptor acceptor_;
std::promise<void> acceptor_close_waiter_;
Expand All @@ -398,6 +458,8 @@ class coro_rpc_server_base {
typename server_config::rpc_protocol::router router_;

std::atomic<uint16_t> port_;
std::string address_;
coro_rpc::err_code errc_ = {};
std::chrono::steady_clock::duration conn_timeout_duration_;

#ifdef YLT_ENABLE_SSL
Expand Down
9 changes: 9 additions & 0 deletions include/ylt/coro_rpc/impl/errno.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ enum class errc : uint16_t {
timed_out,
invalid_rpc_arguments,
address_in_used,
bad_address,
open_error,
listen_error,
operation_canceled,
rpc_throw_exception,
function_not_registered,
Expand All @@ -47,6 +50,12 @@ inline constexpr std::string_view make_error_message(errc ec) noexcept {
return "invalid rpc arg";
case errc::address_in_used:
return "address in used";
case errc::bad_address:
return "bad_address";
case errc::open_error:
return "open_error";
case errc::listen_error:
return "listen_error";
case errc::operation_canceled:
return "operation canceled";
case errc::rpc_throw_exception:
Expand Down
8 changes: 4 additions & 4 deletions include/ylt/easylog/record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ template <typename Type, typename = void>
struct has_data : std::false_type {};

template <typename T>
struct has_data<T, std::void_t<decltype(std::declval<T>().data())>>
: std::true_type {};
struct has_data<T, std::void_t<decltype(std::declval<std::string>().append(
std::declval<T>().data()))>> : std::true_type {};

template <typename T>
constexpr inline bool has_data_v = has_data<std::remove_cvref_t<T>>::value;
Expand All @@ -73,8 +73,8 @@ template <typename Type, typename = void>
struct has_str : std::false_type {};

template <typename T>
struct has_str<T, std::void_t<decltype(std::declval<T>().str())>>
: std::true_type {};
struct has_str<T, std::void_t<decltype(std::declval<std::string>().append(
std::declval<T>().str()))>> : std::true_type {};

template <typename T>
constexpr inline bool has_str_v = has_str<std::remove_cvref_t<T>>::value;
Expand Down
Loading

0 comments on commit acafe3d

Please sign in to comment.