Skip to content

Commit

Permalink
[coro_http_server][feat]update http server (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Feb 29, 2024
1 parent 8fceb3d commit 4592e6e
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 87 deletions.
25 changes: 25 additions & 0 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <asio/connect.hpp>
#include <asio/dispatch.hpp>
#include <asio/experimental/channel.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/read.hpp>
#include <asio/read_at.hpp>
Expand All @@ -36,6 +37,7 @@
#include <deque>

#include "io_context_pool.hpp"
#include "ylt/util/type_traits.h"

namespace coro_io {

Expand Down Expand Up @@ -347,6 +349,29 @@ post(Func func,
co_return co_await awaitor.await_resume(helper);
}

template <typename T>
async_simple::coro::Lazy<std::error_code> async_send(
asio::experimental::channel<void(std::error_code, T)> &channel, T val) {
callback_awaitor<std::error_code> awaitor;
co_return co_await awaitor.await_resume(
[&, val = std::move(val)](auto handler) {
channel.async_send({}, std::move(val), [handler](const auto &ec) {
handler.set_value_then_resume(ec);
});
});
}

template <typename R>
async_simple::coro::Lazy<std::pair<std::error_code, R>> async_receive(
asio::experimental::channel<void(std::error_code, R)> &channel) {
callback_awaitor<std::pair<std::error_code, R>> awaitor;
co_return co_await awaitor.await_resume([&](auto handler) {
channel.async_receive([handler](auto ec, auto val) {
handler.set_value_then_resume(std::make_pair(ec, std::move(val)));
});
});
}

template <typename Socket, typename AsioBuffer>
std::pair<asio::error_code, size_t> read_some(Socket &sock,
AsioBuffer &&buffer) {
Expand Down
26 changes: 20 additions & 6 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
#include <thread>
#include <type_traits>
#include <vector>
#include <ylt/easylog.hpp>
#ifdef __linux__
#include <pthread.h>
#include <sched.h>
#endif

namespace coro_io {

Expand Down Expand Up @@ -108,12 +111,12 @@ get_current_executor() {
class io_context_pool {
public:
using executor_type = asio::io_context::executor_type;
explicit io_context_pool(std::size_t pool_size) : next_io_context_(0) {
explicit io_context_pool(std::size_t pool_size, bool cpu_affinity = false)
: next_io_context_(0), cpu_affinity_(cpu_affinity) {
if (pool_size == 0) {
pool_size = 1; // set default value as 1
}

easylog::logger<>::instance();
for (std::size_t i = 0; i < pool_size; ++i) {
io_context_ptr io_context(new asio::io_context(1));
work_ptr work(new asio::io_context::work(*io_context));
Expand Down Expand Up @@ -141,6 +144,16 @@ class io_context_pool {
svr->run();
},
io_contexts_[i]));

#ifdef __linux__
if (cpu_affinity_) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
pthread_setaffinity_np(threads.back()->native_handle(),
sizeof(cpu_set_t), &cpuset);
}
#endif
}

for (std::size_t i = 0; i < threads.size(); ++i) {
Expand Down Expand Up @@ -199,6 +212,7 @@ class io_context_pool {
std::promise<void> promise_;
std::atomic<bool> has_run_or_stop_ = false;
std::once_flag flag_;
bool cpu_affinity_ = false;
};

class multithread_context_pool {
Expand All @@ -211,7 +225,7 @@ class multithread_context_pool {
~multithread_context_pool() { stop(); }

void run() {
for (std::size_t i = 0; i < thd_num_; i++) {
for (int i = 0; i < thd_num_; i++) {
thds_.emplace_back([this] {
ioc_.run();
});
Expand Down Expand Up @@ -248,7 +262,7 @@ template <typename T = io_context_pool>
inline T &g_io_context_pool(
unsigned pool_size = std::thread::hardware_concurrency()) {
static auto _g_io_context_pool = std::make_shared<T>(pool_size);
static bool run_helper = [](auto pool) {
[[maybe_unused]] static bool run_helper = [](auto pool) {
std::thread thrd{[pool] {
pool->run();
}};
Expand All @@ -262,7 +276,7 @@ template <typename T = io_context_pool>
inline T &g_block_io_context_pool(
unsigned pool_size = std::thread::hardware_concurrency()) {
static auto _g_io_context_pool = std::make_shared<T>(pool_size);
static bool run_helper = [](auto pool) {
[[maybe_unused]] static bool run_helper = [](auto pool) {
std::thread thrd{[pool] {
pool->run();
}};
Expand Down
31 changes: 21 additions & 10 deletions include/ylt/thirdparty/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,29 @@ class coro_http_connection
buffers_.clear();
body_.clear();
resp_str_.clear();
multi_buf_ = true;
if (need_shrink_every_time_) {
body_.shrink_to_fit();
}
}
}

async_simple::coro::Lazy<bool> reply(bool need_to_bufffer = true) {
// avoid duplicate reply
if (need_to_bufffer) {
response_.to_buffers(buffers_);
std::error_code ec;
size_t size;
if (multi_buf_) {
if (need_to_bufffer) {
response_.to_buffers(buffers_);
}
std::tie(ec, size) = co_await async_write(buffers_);
}
auto [ec, _] = co_await async_write(buffers_);
else {
if (need_to_bufffer) {
response_.build_resp_str(resp_str_);
}
std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_));
}

if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
close();
Expand Down Expand Up @@ -394,6 +405,8 @@ class coro_http_connection
return ss.str();
}

void set_multi_buf(bool r) { multi_buf_ = r; }

async_simple::coro::Lazy<bool> write_data(std::string_view message) {
std::vector<asio::const_buffer> buffers;
buffers.push_back(asio::buffer(message));
Expand Down Expand Up @@ -761,13 +774,10 @@ class coro_http_connection

private:
bool check_keep_alive() {
bool keep_alive = true;
auto val = request_.get_header_value("connection");
if (!val.empty() && iequal0(val, "close")) {
keep_alive = false;
if (parser_.has_close()) {
return false;
}

return keep_alive;
return true;
}

void build_ws_handshake_head() {
Expand Down Expand Up @@ -823,5 +833,6 @@ class coro_http_connection
bool use_ssl_ = false;
#endif
bool need_shrink_every_time_ = false;
bool multi_buf_ = true;
};
} // namespace cinatra
6 changes: 1 addition & 5 deletions include/ylt/thirdparty/cinatra/coro_http_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,13 @@ class coro_http_request {
coro_http_connection *get_conn() { return conn_; }

bool is_upgrade() {
auto h = get_header_value("Connection");
if (h.empty())
if (!parser_.has_upgrade())
return false;

auto u = get_header_value("Upgrade");
if (u.empty())
return false;

if (h != UPGRADE)
return false;

if (u != WEBSOCKET)
return false;

Expand Down
1 change: 1 addition & 0 deletions include/ylt/thirdparty/cinatra/coro_http_response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class coro_http_response {

status_type status() { return status_; }
std::string_view content() { return content_; }
size_t content_size() { return content_.size(); }

void add_header(auto k, auto v) {
resp_headers_.emplace_back(resp_header{std::move(k), std::move(v)});
Expand Down
14 changes: 4 additions & 10 deletions include/ylt/thirdparty/cinatra/coro_http_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ class coro_http_router {
}

if (whole_str.find(":") != std::string::npos) {
std::vector<std::string> coro_method_names = {};
std::string coro_method_str;
coro_method_str.append(method_name);
coro_method_names.push_back(coro_method_str);
std::string method_str(method_name);
coro_router_tree_->coro_insert(key, std::move(http_handler),
coro_method_names);
method_str);
}
else {
if (whole_str.find("{") != std::string::npos ||
Expand Down Expand Up @@ -138,11 +135,8 @@ class coro_http_router {
}

if (whole_str.find(':') != std::string::npos) {
std::vector<std::string> method_names = {};
std::string method_str;
method_str.append(method_name);
method_names.push_back(method_str);
router_tree_->insert(whole_str, std::move(http_handler), method_names);
std::string method_str(method_name);
router_tree_->insert(whole_str, std::move(http_handler), method_str);
}
else if (whole_str.find("{") != std::string::npos ||
whole_str.find(")") != std::string::npos) {
Expand Down
6 changes: 4 additions & 2 deletions include/ylt/thirdparty/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ class coro_http_server {
coro_http_server(asio::io_context &ctx, unsigned short port)
: out_ctx_(&ctx), port_(port), acceptor_(ctx), check_timer_(ctx) {}

coro_http_server(size_t thread_num, unsigned short port)
: pool_(std::make_unique<coro_io::io_context_pool>(thread_num)),
coro_http_server(size_t thread_num, unsigned short port,
bool cpu_affinity = false)
: pool_(std::make_unique<coro_io::io_context_pool>(thread_num,
cpu_affinity)),
port_(port),
acceptor_(pool_->get_executor()->get_asio_executor()),
check_timer_(pool_->get_executor()->get_asio_executor()) {}
Expand Down
Loading

0 comments on commit 4592e6e

Please sign in to comment.