Skip to content

Commit

Permalink
remove inner thread in coro_http_client (qicosmos#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jul 13, 2023
1 parent ba0e177 commit 019296a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 75 deletions.
56 changes: 16 additions & 40 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#pragma once
#include <asio/streambuf.hpp>
#include <atomic>
#include <cassert>
#include <charconv>
Expand All @@ -15,6 +14,8 @@
#include <utility>

#include "asio/dispatch.hpp"
#include "asio/error.hpp"
#include "asio/streambuf.hpp"
#include "async_simple/Future.h"
#include "async_simple/Unit.h"
#include "async_simple/coro/FutureAwaiter.h"
Expand All @@ -25,6 +26,7 @@
#include "websocket.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"

namespace coro_io {
template <typename T, typename U>
Expand Down Expand Up @@ -180,28 +182,14 @@ class coro_http_client {
std::string domain;
#endif
};
coro_http_client()
: io_ctx_(std::make_unique<asio::io_context>()),
socket_(std::make_shared<socket_t>(io_ctx_->get_executor())),
executor_wrapper_(io_ctx_->get_executor()),
timer_(&executor_wrapper_) {
std::promise<void> promise;
io_thd_ = std::thread([this, &promise] {
work_ = std::make_unique<asio::io_context::work>(*io_ctx_);
executor_wrapper_.schedule([&] {
promise.set_value();
});
io_ctx_->run();
});
promise.get_future().wait();
}

coro_http_client(asio::io_context::executor_type executor)
: socket_(std::make_shared<socket_t>(executor)),
executor_wrapper_(executor),
timer_(&executor_wrapper_) {}

coro_http_client(coro_io::ExecutorWrapper<> *executor)
coro_http_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
: coro_http_client(executor->get_asio_executor()) {}

bool init_config(const config &conf) {
Expand Down Expand Up @@ -236,22 +224,7 @@ class coro_http_client {
return true;
}

~coro_http_client() {
async_close();
if (io_thd_.joinable()) {
work_ = nullptr;
if (io_thd_.get_id() == std::this_thread::get_id()) {
std::thread thrd{[io_ctx = std::move(io_ctx_),
io_thd = std::move(io_thd_)]() mutable {
io_thd.join();
}};
thrd.detach();
}
else {
io_thd_.join();
}
}
}
~coro_http_client() { async_close(); }

void async_close() {
if (socket_->has_closed_)
Expand All @@ -263,10 +236,9 @@ class coro_http_client {
}

#ifdef CINATRA_ENABLE_SSL
[[nodiscard]] bool init_ssl(const std::string &base_path,
const std::string &cert_file,
int verify_mode = asio::ssl::verify_none,
const std::string &domain = "localhost") {
bool init_ssl(const std::string &base_path, const std::string &cert_file,
int verify_mode = asio::ssl::verify_none,
const std::string &domain = "localhost") {
try {
ssl_init_ret_ = false;
auto full_cert_file = std::filesystem::path(base_path).append(cert_file);
Expand Down Expand Up @@ -1520,6 +1492,7 @@ class coro_http_client {
if (auto [ec, _] = co_await async_read(read_buf_, header_size); ec) {
data.net_err = ec;
data.status = 404;
close_socket(*socket_);
if (on_ws_msg_)
on_ws_msg_(data);
co_return;
Expand All @@ -1543,6 +1516,7 @@ class coro_http_client {
ec) {
data.net_err = ec;
data.status = 404;
close_socket(*socket_);
if (on_ws_msg_)
on_ws_msg_(data);
co_return;
Expand All @@ -1566,6 +1540,11 @@ class coro_http_client {
on_ws_close_(data.resp_body);
co_await async_send_ws("close", false, opcode::close);
async_close();

data.net_err = asio::error::eof;
data.status = 404;
if (on_ws_msg_)
on_ws_msg_(data);
co_return;
}
if (on_ws_msg_)
Expand Down Expand Up @@ -1656,12 +1635,9 @@ class coro_http_client {
return has_http_scheme;
}

std::unique_ptr<asio::io_context> io_ctx_;

coro_io::ExecutorWrapper<> executor_wrapper_;
std::unique_ptr<asio::io_context::work> work_;
coro_io::period_timer timer_;
std::thread io_thd_;
std::shared_ptr<socket_t> socket_;
asio::streambuf read_buf_;
simple_buffer body_{};
Expand Down
37 changes: 27 additions & 10 deletions include/cinatra/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Alibaba Group Holding Limited;
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,19 +15,19 @@
*/
#pragma once
#include <async_simple/Executor.h>
#include <async_simple/coro/Lazy.h>

#include <asio/io_context.hpp>
#include <asio/post.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <type_traits>
#include <vector>

#include "async_simple/coro/Lazy.h"

namespace coro_io {

template <typename ExecutorImpl = asio::io_context::executor_type>
Expand Down Expand Up @@ -107,6 +107,12 @@ class io_context_pool {
}

void run() {
bool has_run_or_stop = false;
bool ok = has_run_or_stop_.compare_exchange_strong(has_run_or_stop, true);
if (!ok) {
return;
}

std::vector<std::shared_ptr<std::thread>> threads;
for (std::size_t i = 0; i < io_contexts_.size(); ++i) {
threads.emplace_back(std::make_shared<std::thread>(
Expand All @@ -123,15 +129,24 @@ class io_context_pool {
}

void stop() {
work_.clear();
promise_.get_future().wait();
return;
std::call_once(flag_, [this] {
bool has_run_or_stop = false;
bool ok = has_run_or_stop_.compare_exchange_strong(has_run_or_stop, true);

work_.clear();

if (ok) {
return;
}

promise_.get_future().wait();
});
}

// ~io_context_pool() {
// if (!has_stop())
// stop();
// }
~io_context_pool() {
if (!has_stop())
stop();
}

std::size_t pool_size() const noexcept { return io_contexts_.size(); }

Expand All @@ -157,6 +172,8 @@ class io_context_pool {
std::vector<work_ptr> work_;
std::atomic<std::size_t> next_io_context_;
std::promise<void> promise_;
std::atomic<bool> has_run_or_stop_ = false;
std::once_flag flag_;
};

class multithread_context_pool {
Expand Down
3 changes: 1 addition & 2 deletions tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ TEST_CASE("test coro_http_client chunked download") {
std::filesystem::remove(filename, ec);
auto r = client.download(uri, filename);
if (!r.net_err)
;
CHECK(r.status >= 200);
CHECK(r.status >= 200);
}

TEST_CASE("test coro_http_client get") {
Expand Down
60 changes: 37 additions & 23 deletions tests/test_cinatra_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ TEST_CASE("test wss client") {
client.on_ws_msg([&promise](resp_data data) {
if (data.net_err) {
std::cout << data.net_err.message() << "\n";
promise.set_value();
return;
}

Expand All @@ -63,14 +64,16 @@ TEST_CASE("test wss client") {
}
#endif

async_simple::coro::Lazy<void> test_websocket(coro_http_client &client) {
async_simple::coro::Lazy<void> test_websocket(coro_http_client &client,
std::promise<void> &promise) {
client.on_ws_close([](std::string_view reason) {
std::cout << "web socket close " << reason << std::endl;
CHECK(reason == "ws close");
});
client.on_ws_msg([](resp_data data) {
client.on_ws_msg([&](resp_data data) {
if (data.net_err) {
std::cout << data.net_err.message() << "\n";
promise.set_value();
return;
}

Expand Down Expand Up @@ -133,7 +136,12 @@ TEST_CASE("test websocket") {

coro_http_client client;
client.set_ws_sec_key("s//GYHa/XO7Hd2F2eOGfyA==");
async_simple::coro::syncAwait(test_websocket(client));

std::promise<void> promise;
async_simple::coro::syncAwait(test_websocket(client, promise));

client.async_close();
promise.get_future().wait();

std::this_thread::sleep_for(std::chrono::milliseconds(300));

Expand Down Expand Up @@ -168,34 +176,37 @@ void test_websocket_content(size_t len) {
REQUIRE(async_simple::coro::syncAwait(
client.async_ws_connect("ws://localhost:8090")));

auto promise = std::make_shared<std::promise<void>>();
std::weak_ptr<std::promise<void>> weak = promise;
std::pair<std::promise<void>, bool> msg_pair_promise{};

std::string send_str(len, 'a');

std::string str(len, '\0');
client.on_ws_msg([&str, weak](resp_data data) {
std::promise<void> quit_promise{};

client.on_ws_msg([&, send_str](resp_data data) {
if (data.net_err) {
std::cout << "ws_msg net error " << data.net_err.message() << "\n";
if (auto p = weak.lock(); p) {
p->set_value();
quit_promise.set_value();
if (!msg_pair_promise.second) {
msg_pair_promise.first.set_value();
}

return;
}

std::cout << "ws msg len: " << data.resp_body.size() << std::endl;
REQUIRE(data.resp_body.size() == str.size());
CHECK(data.resp_body == str);
if (auto p = weak.lock(); p) {
p->set_value();
}
REQUIRE(data.resp_body.size() == send_str.size());
CHECK(data.resp_body == send_str);
msg_pair_promise.first.set_value();
msg_pair_promise.second = true;
});

auto result = async_simple::coro::syncAwait(client.async_send_ws(str));
CHECK(!result.net_err);

promise->get_future().wait();
async_simple::coro::syncAwait(client.async_send_ws(send_str));
msg_pair_promise.first.get_future().wait();

server.stop();
server_thread.join();

quit_promise.get_future().wait();
}

TEST_CASE("test websocket content lt 126") {
Expand Down Expand Up @@ -226,22 +237,25 @@ TEST_CASE("test send after server stop") {
});
f.wait();

coro_http_client client{};
auto client = std::make_shared<coro_http_client>();
REQUIRE(async_simple::coro::syncAwait(
client.async_ws_connect("ws://localhost:8090")));
client->async_ws_connect("ws://localhost:8090")));

client.on_ws_msg([&client](resp_data data) {
std::promise<void> promise;
client->on_ws_msg([&client, &promise](resp_data data) {
if (data.net_err) {
client.async_close();
client->async_close();
}
promise.set_value();
});

server.stop();

std::this_thread::sleep_for(std::chrono::milliseconds(600));

auto result = async_simple::coro::syncAwait(client.async_send_ws(""));
auto result = async_simple::coro::syncAwait(client->async_send_ws(""));
CHECK(result.net_err);

server_thread.join();
promise.get_future().wait();
}

0 comments on commit 019296a

Please sign in to comment.