diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index a7e4302f3..c2bb0479c 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include #include "io_context_pool.hpp" +#include "ylt/util/type_traits.h" namespace coro_io { @@ -347,6 +349,29 @@ post(Func func, co_return co_await awaitor.await_resume(helper); } +template +async_simple::coro::Lazy async_send( + asio::experimental::channel &channel, T val) { + callback_awaitor awaitor; + co_return co_await awaitor.await_resume( + [&, val = std::move(val)](auto handler) { + channel.async_send({}, std::move(val), [handler](const auto &ec) { + handler.set_value_then_resume(ec); + }); + }); +} + +template +async_simple::coro::Lazy> async_receive( + asio::experimental::channel &channel) { + callback_awaitor> awaitor; + co_return co_await awaitor.await_resume([&](auto handler) { + channel.async_receive([handler](auto ec, auto val) { + handler.set_value_then_resume(std::make_pair(ec, std::move(val))); + }); + }); +} + template std::pair read_some(Socket &sock, AsioBuffer &&buffer) { diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index e10475503..81bce7d4a 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -27,7 +27,10 @@ #include #include #include -#include +#ifdef __linux__ +#include +#include +#endif namespace coro_io { @@ -108,12 +111,12 @@ get_current_executor() { class io_context_pool { public: using executor_type = asio::io_context::executor_type; - explicit io_context_pool(std::size_t pool_size) : next_io_context_(0) { + explicit io_context_pool(std::size_t pool_size, bool cpu_affinity = false) + : next_io_context_(0), cpu_affinity_(cpu_affinity) { if (pool_size == 0) { pool_size = 1; // set default value as 1 } - easylog::logger<>::instance(); for (std::size_t i = 0; i < pool_size; ++i) { io_context_ptr io_context(new asio::io_context(1)); work_ptr work(new asio::io_context::work(*io_context)); @@ -141,6 +144,16 @@ class io_context_pool { svr->run(); }, io_contexts_[i])); + +#ifdef __linux__ + if (cpu_affinity_) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(i, &cpuset); + pthread_setaffinity_np(threads.back()->native_handle(), + sizeof(cpu_set_t), &cpuset); + } +#endif } for (std::size_t i = 0; i < threads.size(); ++i) { @@ -199,6 +212,7 @@ class io_context_pool { std::promise promise_; std::atomic has_run_or_stop_ = false; std::once_flag flag_; + bool cpu_affinity_ = false; }; class multithread_context_pool { @@ -211,7 +225,7 @@ class multithread_context_pool { ~multithread_context_pool() { stop(); } void run() { - for (std::size_t i = 0; i < thd_num_; i++) { + for (int i = 0; i < thd_num_; i++) { thds_.emplace_back([this] { ioc_.run(); }); @@ -248,7 +262,7 @@ template inline T &g_io_context_pool( unsigned pool_size = std::thread::hardware_concurrency()) { static auto _g_io_context_pool = std::make_shared(pool_size); - static bool run_helper = [](auto pool) { + [[maybe_unused]] static bool run_helper = [](auto pool) { std::thread thrd{[pool] { pool->run(); }}; @@ -262,7 +276,7 @@ template inline T &g_block_io_context_pool( unsigned pool_size = std::thread::hardware_concurrency()) { static auto _g_io_context_pool = std::make_shared(pool_size); - static bool run_helper = [](auto pool) { + [[maybe_unused]] static bool run_helper = [](auto pool) { std::thread thrd{[pool] { pool->run(); }}; diff --git a/include/ylt/thirdparty/cinatra/coro_http_connection.hpp b/include/ylt/thirdparty/cinatra/coro_http_connection.hpp index 92aeaeb24..f05903796 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_connection.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_connection.hpp @@ -338,6 +338,7 @@ class coro_http_connection buffers_.clear(); body_.clear(); resp_str_.clear(); + multi_buf_ = true; if (need_shrink_every_time_) { body_.shrink_to_fit(); } @@ -345,11 +346,21 @@ class coro_http_connection } async_simple::coro::Lazy reply(bool need_to_bufffer = true) { - // avoid duplicate reply - if (need_to_bufffer) { - response_.to_buffers(buffers_); + std::error_code ec; + size_t size; + if (multi_buf_) { + if (need_to_bufffer) { + response_.to_buffers(buffers_); + } + std::tie(ec, size) = co_await async_write(buffers_); } - auto [ec, _] = co_await async_write(buffers_); + else { + if (need_to_bufffer) { + response_.build_resp_str(resp_str_); + } + std::tie(ec, size) = co_await async_write(asio::buffer(resp_str_)); + } + if (ec) { CINATRA_LOG_ERROR << "async_write error: " << ec.message(); close(); @@ -394,6 +405,8 @@ class coro_http_connection return ss.str(); } + void set_multi_buf(bool r) { multi_buf_ = r; } + async_simple::coro::Lazy write_data(std::string_view message) { std::vector buffers; buffers.push_back(asio::buffer(message)); @@ -761,13 +774,10 @@ class coro_http_connection private: bool check_keep_alive() { - bool keep_alive = true; - auto val = request_.get_header_value("connection"); - if (!val.empty() && iequal0(val, "close")) { - keep_alive = false; + if (parser_.has_close()) { + return false; } - - return keep_alive; + return true; } void build_ws_handshake_head() { @@ -823,5 +833,6 @@ class coro_http_connection bool use_ssl_ = false; #endif bool need_shrink_every_time_ = false; + bool multi_buf_ = true; }; } // namespace cinatra diff --git a/include/ylt/thirdparty/cinatra/coro_http_request.hpp b/include/ylt/thirdparty/cinatra/coro_http_request.hpp index fa45b9cb2..f5788d31b 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_request.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_request.hpp @@ -188,17 +188,13 @@ class coro_http_request { coro_http_connection *get_conn() { return conn_; } bool is_upgrade() { - auto h = get_header_value("Connection"); - if (h.empty()) + if (!parser_.has_upgrade()) return false; auto u = get_header_value("Upgrade"); if (u.empty()) return false; - if (h != UPGRADE) - return false; - if (u != WEBSOCKET) return false; diff --git a/include/ylt/thirdparty/cinatra/coro_http_response.hpp b/include/ylt/thirdparty/cinatra/coro_http_response.hpp index e3c73e8ac..0d1d61acc 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_response.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_response.hpp @@ -94,6 +94,7 @@ class coro_http_response { status_type status() { return status_; } std::string_view content() { return content_; } + size_t content_size() { return content_.size(); } void add_header(auto k, auto v) { resp_headers_.emplace_back(resp_header{std::move(k), std::move(v)}); diff --git a/include/ylt/thirdparty/cinatra/coro_http_router.hpp b/include/ylt/thirdparty/cinatra/coro_http_router.hpp index a620ab2c7..d60b3dcfc 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_router.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_router.hpp @@ -88,12 +88,9 @@ class coro_http_router { } if (whole_str.find(":") != std::string::npos) { - std::vector coro_method_names = {}; - std::string coro_method_str; - coro_method_str.append(method_name); - coro_method_names.push_back(coro_method_str); + std::string method_str(method_name); coro_router_tree_->coro_insert(key, std::move(http_handler), - coro_method_names); + method_str); } else { if (whole_str.find("{") != std::string::npos || @@ -138,11 +135,8 @@ class coro_http_router { } if (whole_str.find(':') != std::string::npos) { - std::vector method_names = {}; - std::string method_str; - method_str.append(method_name); - method_names.push_back(method_str); - router_tree_->insert(whole_str, std::move(http_handler), method_names); + std::string method_str(method_name); + router_tree_->insert(whole_str, std::move(http_handler), method_str); } else if (whole_str.find("{") != std::string::npos || whole_str.find(")") != std::string::npos) { diff --git a/include/ylt/thirdparty/cinatra/coro_http_server.hpp b/include/ylt/thirdparty/cinatra/coro_http_server.hpp index 01d57e62b..9f9ae0b97 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_server.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_server.hpp @@ -30,8 +30,10 @@ class coro_http_server { coro_http_server(asio::io_context &ctx, unsigned short port) : out_ctx_(&ctx), port_(port), acceptor_(ctx), check_timer_(ctx) {} - coro_http_server(size_t thread_num, unsigned short port) - : pool_(std::make_unique(thread_num)), + coro_http_server(size_t thread_num, unsigned short port, + bool cpu_affinity = false) + : pool_(std::make_unique(thread_num, + cpu_affinity)), port_(port), acceptor_(pool_->get_executor()->get_asio_executor()), check_timer_(pool_->get_executor()->get_asio_executor()) {} diff --git a/include/ylt/thirdparty/cinatra/coro_radix_tree.hpp b/include/ylt/thirdparty/cinatra/coro_radix_tree.hpp index 585446b87..14f76f22c 100644 --- a/include/ylt/thirdparty/cinatra/coro_radix_tree.hpp +++ b/include/ylt/thirdparty/cinatra/coro_radix_tree.hpp @@ -42,8 +42,8 @@ struct coro_handler_t { struct radix_tree_node { std::string path; - std::vector handlers; - std::vector coro_handlers; + handler_t handler; + coro_handler_t coro_handler; std::string indices; std::vector> children; int max_params; @@ -54,21 +54,18 @@ struct radix_tree_node { std::function get_handler(const std::string &method) { - for (auto &h : this->handlers) { - if (h.method == method) { - return h.handler; - } + if (handler.method == method) { + return handler.handler; } + return nullptr; } std::function(coro_http_request &req, coro_http_response &resp)> get_coro_handler(const std::string &method) { - for (auto &h : this->coro_handlers) { - if (h.method == method) { - return h.coro_handler; - } + if (coro_handler.method == method) { + return coro_handler.coro_handler; } return nullptr; } @@ -76,22 +73,17 @@ struct radix_tree_node { int add_handler( std::function handler, - const std::vector &methods) { - for (auto &m : methods) { - auto old_handler = this->get_handler(m); - this->handlers.push_back(handler_t{m, handler}); - } + const std::string &method) { + this->handler = handler_t{method, handler}; + return 0; } int add_coro_handler(std::function( coro_http_request &req, coro_http_response &resp)> coro_handler, - const std::vector &methods) { - for (auto &m : methods) { - auto old_coro_handler = this->get_coro_handler(m); - this->coro_handlers.push_back(coro_handler_t{m, coro_handler}); - } + const std::string &method) { + this->coro_handler = coro_handler_t{method, coro_handler}; return 0; } @@ -134,7 +126,7 @@ class radix_tree { const std::string &path, std::function handler, - const std::vector &methods) { + const std::string &method) { auto root = this->root; int i = 0, n = path.size(), param_count = 0, code = 0; while (i < n) { @@ -166,7 +158,7 @@ class radix_tree { ++param_count; } - code = root->add_handler(handler, methods); + code = root->add_handler(handler, method); break; } @@ -181,7 +173,7 @@ class radix_tree { ++param_count; if (i == n) { - code = root->add_handler(handler, methods); + code = root->add_handler(handler, method); break; } } @@ -193,7 +185,7 @@ class radix_tree { i += root->path.size() + 1; if (i == n) { - code = root->add_handler(handler, methods); + code = root->add_handler(handler, method); break; } } @@ -207,18 +199,18 @@ class radix_tree { if (j < m) { std::shared_ptr child( std::make_shared(root->path.substr(j))); - child->handlers = root->handlers; + child->handler = root->handler; child->indices = root->indices; child->children = root->children; root->path = root->path.substr(0, j); - root->handlers = {}; + root->handler = {}; root->indices = child->path[0]; root->children = {child}; } if (i == n) { - code = root->add_handler(handler, methods); + code = root->add_handler(handler, method); break; } } @@ -235,7 +227,7 @@ class radix_tree { std::function( coro_http_request &req, coro_http_response &resp)> coro_handler, - const std::vector &methods) { + std::string &method) { auto root = this->root; int i = 0, n = path.size(), param_count = 0, code = 0; while (i < n) { @@ -267,7 +259,7 @@ class radix_tree { ++param_count; } - code = root->add_coro_handler(coro_handler, methods); + code = root->add_coro_handler(coro_handler, method); break; } @@ -282,7 +274,7 @@ class radix_tree { ++param_count; if (i == n) { - code = root->add_coro_handler(coro_handler, methods); + code = root->add_coro_handler(coro_handler, method); break; } } @@ -294,7 +286,7 @@ class radix_tree { i += root->path.size() + 1; if (i == n) { - code = root->add_coro_handler(coro_handler, methods); + code = root->add_coro_handler(coro_handler, method); break; } } @@ -308,18 +300,18 @@ class radix_tree { if (j < m) { std::shared_ptr child( std::make_shared(root->path.substr(j))); - child->handlers = root->handlers; + child->handler = root->handler; child->indices = root->indices; child->children = root->children; root->path = root->path.substr(0, j); - root->handlers = {}; + root->handler = {}; root->indices = child->path[0]; root->children = {child}; } if (i == n) { - code = root->add_coro_handler(coro_handler, methods); + code = root->add_coro_handler(coro_handler, method); break; } } diff --git a/include/ylt/thirdparty/cinatra/http_parser.hpp b/include/ylt/thirdparty/cinatra/http_parser.hpp index 6c2fa898b..15e1e2f55 100644 --- a/include/ylt/thirdparty/cinatra/http_parser.hpp +++ b/include/ylt/thirdparty/cinatra/http_parser.hpp @@ -8,6 +8,7 @@ #include #include "cinatra_log_wrapper.hpp" +#include "define.h" #include "picohttpparser.h" #include "url_encode_decode.hpp" @@ -64,9 +65,12 @@ class http_parser { size_t method_len; const char *url; size_t url_len; + + bool has_query{}; header_len_ = detail::phr_parse_request( data, size, &method, &method_len, &url, &url_len, &minor_version, - headers_.data(), &num_headers_, last_len); + headers_.data(), &num_headers_, last_len, has_connection_, has_close_, + has_upgrade_, has_query); if (header_len_ < 0) [[unlikely]] { CINATRA_LOG_WARNING << "parse http head failed"; @@ -76,21 +80,28 @@ class http_parser { << ", you can define macro " "CINATRA_MAX_HTTP_HEADER_FIELD_SIZE to expand it."; } + return header_len_; } method_ = {method, method_len}; url_ = {url, url_len}; - auto content_len = this->get_header_value("content-length"sv); - if (content_len.empty()) { + auto methd_type = method_type(method_); + if (methd_type == http_method::GET || methd_type == http_method::HEAD) { body_len_ = 0; } else { - body_len_ = atoi(content_len.data()); + auto content_len = this->get_header_value("content-length"sv); + if (content_len.empty()) { + body_len_ = 0; + } + else { + body_len_ = atoi(content_len.data()); + } } - size_t pos = url_.find('?'); - if (pos != std::string_view::npos) { + if (has_query) { + size_t pos = url_.find('?'); parse_query(url_.substr(pos + 1, url_len - pos - 1)); url_ = {url, pos}; } @@ -98,6 +109,12 @@ class http_parser { return header_len_; } + bool has_connection() { return has_connection_; } + + bool has_close() { return has_close_; } + + bool has_upgrade() { return has_upgrade_; } + std::string_view get_header_value(std::string_view key) const { for (size_t i = 0; i < num_headers_; i++) { if (iequal0(headers_[i].name, key)) @@ -247,6 +264,9 @@ class http_parser { size_t num_headers_ = 0; int header_len_ = 0; int body_len_ = 0; + bool has_connection_{}; + bool has_close_{}; + bool has_upgrade_{}; std::array headers_; std::string_view method_; std::string_view url_; diff --git a/include/ylt/thirdparty/cinatra/picohttpparser.h b/include/ylt/thirdparty/cinatra/picohttpparser.h index 031480cf5..310044b3d 100644 --- a/include/ylt/thirdparty/cinatra/picohttpparser.h +++ b/include/ylt/thirdparty/cinatra/picohttpparser.h @@ -808,7 +808,9 @@ static const char *parse_headers(const char *buf, const char *buf_end, static const char *parse_headers(const char *buf, const char *buf_end, http_header *headers, size_t *num_headers, - size_t max_headers, int *ret) { + size_t max_headers, int *ret, + bool &has_connection, bool &has_close, + bool &has_upgrade) { for (;; ++*num_headers) { const char *name; size_t name_len; @@ -877,6 +879,21 @@ static const char *parse_headers(const char *buf, const char *buf_end, NULL) { return NULL; } + if (name_len == 10) { + if (memcmp(name + 1, "onnection", name_len - 1) == 0) { + // has connection + has_connection = true; + char ch = *value; + if (ch == 'U') { + // has upgrade + has_upgrade = true; + } + else if (ch == 'c' || ch == 'C') { + // has_close + has_close = true; + } + } + } headers[*num_headers] = {std::string_view{name, name_len}, std::string_view{value, value_len}}; } @@ -885,12 +902,40 @@ static const char *parse_headers(const char *buf, const char *buf_end, #endif -static const char *parse_request(const char *buf, const char *buf_end, - const char **method, size_t *method_len, - const char **path, size_t *path_len, - int *minor_version, http_header *headers, - size_t *num_headers, size_t max_headers, - int *ret) { +#define ADVANCE_PATH(tok, toklen, has_query) \ + do { \ + const char *tok_start = buf; \ + static const char ALIGNED(16) ranges2[] = "\000\040\177\177"; \ + int found2; \ + buf = findchar_fast(buf, buf_end, ranges2, sizeof(ranges2) - 1, &found2); \ + if (!found2) { \ + CHECK_EOF(); \ + } \ + while (1) { \ + if (*buf == ' ') { \ + break; \ + } \ + else if (unlikely(!IS_PRINTABLE_ASCII(*buf))) { \ + if ((unsigned char)*buf < '\040' || *buf == '\177') { \ + *ret = -1; \ + return NULL; \ + } \ + } \ + else if (unlikely(*buf == '?')) { \ + has_query = true; \ + } \ + ++buf; \ + CHECK_EOF(); \ + } \ + tok = tok_start; \ + toklen = buf - tok_start; \ + } while (0) + +static const char *parse_request( + const char *buf, const char *buf_end, const char **method, + size_t *method_len, const char **path, size_t *path_len, int *minor_version, + http_header *headers, size_t *num_headers, size_t max_headers, int *ret, + bool &has_connection, bool &has_close, bool &has_upgrade, bool &has_query) { /* skip first empty line (some clients add CRLF after POST content) */ CHECK_EOF(); if (*buf == '\015') { @@ -904,7 +949,7 @@ static const char *parse_request(const char *buf, const char *buf_end, /* parse request line */ ADVANCE_TOKEN(*method, *method_len); ++buf; - ADVANCE_TOKEN(*path, *path_len); + ADVANCE_PATH(*path, *path_len, has_query); ++buf; if ((buf = parse_http_version(buf, buf_end, minor_version, ret)) == NULL) { return NULL; @@ -921,14 +966,17 @@ static const char *parse_request(const char *buf, const char *buf_end, return NULL; } - return parse_headers(buf, buf_end, headers, num_headers, max_headers, ret); + return parse_headers(buf, buf_end, headers, num_headers, max_headers, ret, + has_connection, has_close, has_upgrade); } inline int phr_parse_request(const char *buf_start, size_t len, const char **method, size_t *method_len, const char **path, size_t *path_len, int *minor_version, http_header *headers, - size_t *num_headers, size_t last_len) { + size_t *num_headers, size_t last_len, + bool &has_connection, bool &has_close, + bool &has_upgrade, bool &has_query) { const char *buf = buf_start, *buf_end = buf_start + len; size_t max_headers = *num_headers; int r; @@ -948,7 +996,8 @@ inline int phr_parse_request(const char *buf_start, size_t len, if ((buf = parse_request(buf + last_len, buf_end, method, method_len, path, path_len, minor_version, headers, num_headers, - max_headers, &r)) == NULL) { + max_headers, &r, has_connection, has_close, + has_upgrade, has_query)) == NULL) { return r; } @@ -987,7 +1036,10 @@ inline const char *parse_response(const char *buf, const char *buf_end, return NULL; } - return parse_headers(buf, buf_end, headers, num_headers, max_headers, ret); + bool has_connection, has_close, has_upgrade; + + return parse_headers(buf, buf_end, headers, num_headers, max_headers, ret, + has_connection, has_close, has_upgrade); } inline int phr_parse_response(const char *buf_start, size_t len, @@ -1033,8 +1085,9 @@ inline int phr_parse_headers(const char *buf_start, size_t len, return r; } - if ((buf = parse_headers(buf, buf_end, headers, num_headers, max_headers, - &r)) == NULL) { + bool has_connection, has_close, has_upgrade; + if ((buf = parse_headers(buf, buf_end, headers, num_headers, max_headers, &r, + has_connection, has_close, has_upgrade)) == NULL) { return r; } diff --git a/src/coro_http/examples/example.cpp b/src/coro_http/examples/example.cpp index 3196b36bf..35356dede 100644 --- a/src/coro_http/examples/example.cpp +++ b/src/coro_http/examples/example.cpp @@ -543,6 +543,27 @@ void http_proxy() { assert(!resp_random.resp_body.empty()); } +void coro_channel() { + auto ctx = coro_io::get_global_block_executor()->get_asio_executor(); + asio::experimental::channel ch(ctx, 10000); + auto ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 41)); + assert(!ec); + ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 42)); + assert(!ec); + + std::error_code err; + int val; + std::tie(err, val) = + async_simple::coro::syncAwait(coro_io::async_receive(ch)); + assert(!err); + assert(val == 41); + + std::tie(err, val) = + async_simple::coro::syncAwait(coro_io::async_receive(ch)); + assert(!err); + assert(val == 42); +} + int main() { async_simple::coro::syncAwait(basic_usage()); async_simple::coro::syncAwait(use_aspects()); @@ -554,5 +575,6 @@ int main() { test_gzip(); #endif http_proxy(); + coro_channel(); return 0; } \ No newline at end of file