Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Jul 13, 2023
1 parent b08e2de commit db54bfe
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 291 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ubuntu_clang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ jobs:

- name: Configure
run: |
CXX=clang++ CC=clang
cmake -B ${{github.workspace}}/build -G Ninja \
-DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DENABLE_SSL=${{matrix.ssl}} \
-DUSE_CCACHE=${{env.ccache}}
-DUSE_CCACHE=${{env.ccache}} -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++
- name: Build
run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}}

Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class channel {
channel(const channel& o) = delete;
channel& operator=(const channel& o) = delete;

auto send_request(auto op, const typename client_t::config& config)
auto send_request(auto op, typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op),
std::string_view{},
config)) {
Expand Down
29 changes: 8 additions & 21 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ class client_pool : public std::enable_shared_from_this<
bool ok = false;

for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) {
co_await coro_io::sleep_for(pool_config_.reconnect_wait_time);
ok = (client_t::is_ok(co_await client->reconnect(host_name_)));
if (!ok) {
co_await coro_io::sleep_for(pool_config_.reconnect_wait_time);
}
}
co_return ok ? std::move(client) : nullptr;
}
Expand All @@ -115,17 +113,7 @@ class client_pool : public std::enable_shared_from_this<
if (!free_clients_.try_dequeue(client)) {
break;
}
if (client->has_closed()) {
[self = this->shared_from_this()](std::unique_ptr<client_t> client)
-> async_simple::coro::Lazy<void> {
self->collect_free_client(
co_await self->reconnect(std::move(client)));
co_return;
}(std::move(client))
.start([](auto&& v) {
});
}
else {
if (!client->has_closed()) {
break;
}
}
Expand Down Expand Up @@ -232,7 +220,7 @@ class client_pool : public std::enable_shared_from_this<

template <typename T>
async_simple::coro::Lazy<return_type<T>> send_request(
T op, const typename client_t::config& client_config) {
T op, typename client_t::config& client_config) {
// return type: Lazy<expected<T::returnType,std::errc>>
auto client = co_await get_client(client_config);
if (!client) {
Expand All @@ -252,7 +240,7 @@ class client_pool : public std::enable_shared_from_this<

template <typename T>
decltype(auto) send_request(T op) {
return send_request(op, pool_config_.client_config);
return send_request(std::move(op), pool_config_.client_config);
}

std::size_t free_client_count() const noexcept {
Expand All @@ -271,7 +259,7 @@ class client_pool : public std::enable_shared_from_this<
template <typename T>
async_simple::coro::Lazy<return_type_with_host<T>> send_request(
T op, std::string_view endpoint,
const typename client_t::config& client_config) {
typename client_t::config& client_config) {
// return type: Lazy<expected<T::returnType,std::errc>>
auto client = co_await get_client(client_config);
if (!client) {
Expand All @@ -293,7 +281,7 @@ class client_pool : public std::enable_shared_from_this<

template <typename T>
decltype(auto) send_request(T op, std::string_view sv) {
return send_request(op, sv, pool_config_.client_config);
return send_request(std::move(op), sv, pool_config_.client_config);
}

coro_io::detail::client_queue<std::unique_ptr<client_t>> free_clients_;
Expand Down Expand Up @@ -322,11 +310,10 @@ class client_pools {
co_return ret;
}
auto send_request(std::string_view host_name,
const typename client_pool_t::pool_config& pool_config,
auto op)
typename client_pool_t::pool_config& pool_config, auto op)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op))) {
auto pool = get_client_pool(host_name, pool_config);
auto ret = co_await pool.send_request(std::move(op));
auto ret = co_await pool->send_request(std::move(op));
co_return ret;
}
auto at(std::string_view host_name) {
Expand Down
7 changes: 4 additions & 3 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ class ExecutorWrapper : public async_simple::Executor {

private:
void schedule(Func func, Duration dur) override {
auto timer = std::make_shared<asio::steady_timer>(executor_, dur);
timer->async_wait([fn = std::move(func), timer](auto ec) {
auto timer = std::make_unique<asio::steady_timer>(executor_, dur);
auto tm = timer.get();
tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) {
fn();
});
}
};

template <typename ExecutorImpl = asio::io_context>
inline async_simple::coro::Lazy<typename ExecutorImpl::executor_type>
get_executor() {
get_current_executor() {
auto executor = co_await async_simple::CurrentExecutor{};
assert(executor != nullptr);
co_return static_cast<ExecutorImpl *>(executor->checkout())->get_executor();
Expand Down
84 changes: 34 additions & 50 deletions include/ylt/coro_rpc/impl/coro_rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
#include <vector>
#include <ylt/easylog.hpp>

#include "async_simple/Promise.h"
#include "common_service.hpp"
#include "coro_connection.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_rpc/impl/expected.hpp"
namespace coro_rpc {
/*!
* ```cpp
Expand Down Expand Up @@ -102,48 +104,18 @@ class coro_rpc_server_base {
* @return error code if start failed, otherwise block until server stop.
*/
[[nodiscard]] std::errc start() noexcept {
std::errc ec{};
{
std::unique_lock lock(start_mtx_);
if (flag_ != stat::init) {
if (flag_ == stat::started) {
ELOGV(INFO, "start again");
}
else if (flag_ == stat::stop) {
ELOGV(INFO, "has stoped");
}
return std::errc::io_error;
}

ec = listen();
if (ec == std::errc{}) {
thd_ = std::thread([this] {
pool_.run();
});

flag_ = stat::started;
}
else {
flag_ = stat::stop;
}
auto ret = async_start();
if (ret) {
ret.value().wait();
return ret.value().value();
}

cond_.notify_all();

if (ec == std::errc{}) {
async_simple::coro::syncAwait(accept());
else {
return ret.error();
}
return ec;
}

[[nodiscard]] bool wait_for_start(auto duration) {
std::unique_lock lock(start_mtx_);
return cond_.wait_for(lock, duration, [this] {
return flag_ == stat::started || flag_ == stat::stop;
});
}

[[nodiscard]] async_simple::coro::Lazy<std::errc> async_start() noexcept {
[[nodiscard]] coro_rpc::expected<async_simple::Future<std::errc>, std::errc>
async_start() noexcept {
std::errc ec{};
{
std::unique_lock lock(start_mtx_);
Expand All @@ -154,9 +126,9 @@ class coro_rpc_server_base {
else if (flag_ == stat::stop) {
ELOGV(INFO, "has stoped");
}
co_return std::errc::io_error;
return coro_rpc::unexpected<std::errc>{
std::errc::resource_unavailable_try_again};
}

ec = listen();
if (ec == std::errc{}) {
if constexpr (requires(typename server_config::executor_pool_t & pool) {
Expand All @@ -172,12 +144,22 @@ class coro_rpc_server_base {
flag_ = stat::stop;
}
}

cond_.notify_all();
if (ec == std::errc{}) {
co_await accept();
async_simple::Promise<std::errc> promise;
auto future = promise.getFuture();
accept().start([p = std::move(promise)](auto &&res) mutable {
if (res.hasError()) {
p.setValue(std::errc::io_error);
}
else {
p.setValue(res.value());
}
});
return std::move(future);
}
else {
return coro_rpc::unexpected<std::errc>{ec};
}
co_return ec;
}

/*!
Expand All @@ -193,8 +175,8 @@ class coro_rpc_server_base {

ELOGV(INFO, "begin to stop coro_rpc_server, conn size %d", conns_.size());

close_acceptor();
if (flag_ == stat::started) {
close_acceptor();
{
std::unique_lock lock(conns_mtx_);
for (auto &conn : conns_) {
Expand Down Expand Up @@ -350,9 +332,11 @@ class coro_rpc_server_base {
}
#endif
if (error) {
ELOGV(ERROR, "accept failed, error: %s", error.message().data());
if (error == asio::error::operation_aborted) {
co_return std::errc::io_error;
ELOGV(INFO, "accept failed, error: %s", error.message().data());
if (error == asio::error::operation_aborted ||
error == asio::error::bad_descriptor) {
acceptor_close_waiter_.set_value();
co_return std::errc::operation_canceled;
}
continue;
}
Expand Down Expand Up @@ -392,17 +376,17 @@ class coro_rpc_server_base {
acceptor_.cancel(ec);
acceptor_.close(ec);
});
acceptor_close_waiter_.get_future().wait();
}

typename server_config::executor_pool_t pool_;
asio::ip::tcp::acceptor acceptor_;
std::promise<void> acceptor_close_waiter_;

std::thread thd_;
stat flag_;

std::mutex start_mtx_;
std::condition_variable cond_;

uint64_t conn_id_ = 0;
std::unordered_map<uint64_t, std::shared_ptr<coro_connection>> conns_;
std::mutex conns_mtx_;
Expand Down
27 changes: 10 additions & 17 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
TEST_CASE("test RR") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
Expand All @@ -46,10 +44,8 @@ TEST_CASE("test RR") {
TEST_CASE("test Random") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
Expand Down Expand Up @@ -77,10 +73,8 @@ TEST_CASE("test Random") {
TEST_CASE("test single host") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
Expand All @@ -100,20 +94,19 @@ TEST_CASE("test single host") {
TEST_CASE("test send_request config") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8802);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:8802"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
auto config = coro_rpc::coro_rpc_client::config{.client_id = 114514};
auto res = co_await channel.send_request(
[&i, &hosts](coro_rpc::coro_rpc_client &client, std::string_view host)
-> async_simple::coro::Lazy<void> {
CHECK(client.get_client_id() == 114514);
co_return;
},
coro_rpc::coro_rpc_client::config{.client_id = 114514});
config);
CHECK(res.has_value());
}
server.stop();
Expand Down
Loading

0 comments on commit db54bfe

Please sign in to comment.