diff --git a/include/ylt/coro_http/coro_http_server.hpp b/include/ylt/coro_http/coro_http_server.hpp new file mode 100644 index 000000000..84373040f --- /dev/null +++ b/include/ylt/coro_http/coro_http_server.hpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2023, Alibaba Group Holding Limited; + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#ifdef YLT_ENABLE_SSL +#define CINATRA_ENABLE_SSL +#endif +#include +#define CINATRA_LOG_ERROR ELOG_ERROR +#define CINATRA_LOG_WARNING ELOG_WARN +#define CINATRA_LOG_INFO ELOG_INFO +#define CINATRA_LOG_DEBUG ELOG_DEBUG +#define CINATRA_LOG_TRACE ELOG_TRACE + +#include + +namespace coro_http { +using coro_http_server = cinatra::coro_http_server; +using coro_http_request = cinatra::coro_http_request; +using coro_http_response = cinatra::coro_http_response; +using status_type = cinatra::status_type; +using http_method = cinatra::http_method; +constexpr auto GET = cinatra::GET; +constexpr auto POST = cinatra::POST; +} // namespace coro_http \ No newline at end of file diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index e20f94b59..058ffb730 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -221,7 +221,7 @@ inline async_simple::coro::Lazy async_connect( template inline async_simple::coro::Lazy async_close(Socket &socket) noexcept { callback_awaitor awaitor; - auto &executor = socket.get_executor(); + auto executor = socket.get_executor(); co_return co_await awaitor.await_resume([&](auto handler) { asio::post(executor, [&, handler]() { asio::error_code ignored_ec; @@ -282,6 +282,22 @@ inline async_simple::coro::Lazy sleep_for(Duration d) { } } +template +inline async_simple::coro::Lazy post( + Func func, + coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) { + callback_awaitor awaitor; + + co_return co_await awaitor.await_resume( + [e, func = std::move(func)](auto handler) { + auto executor = e->get_asio_executor(); + asio::post(executor, [=, func = std::move(func)]() { + func(); + handler.resume(); + }); + }); +} + template std::pair read_some(Socket &sock, AsioBuffer &&buffer) { diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index e9f38493b..e10475503 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -115,7 +115,7 @@ class io_context_pool { easylog::logger<>::instance(); for (std::size_t i = 0; i < pool_size; ++i) { - io_context_ptr io_context(new asio::io_context); + io_context_ptr io_context(new asio::io_context(1)); work_ptr work(new asio::io_context::work(*io_context)); io_contexts_.push_back(io_context); auto executor = std::make_unique>( diff --git a/include/ylt/thirdparty/cinatra/coro_http_connection.hpp b/include/ylt/thirdparty/cinatra/coro_http_connection.hpp new file mode 100644 index 000000000..67b7a1f29 --- /dev/null +++ b/include/ylt/thirdparty/cinatra/coro_http_connection.hpp @@ -0,0 +1,379 @@ +#pragma once +#include +#include + +#include +#include +#include + +#include "asio/dispatch.hpp" +#include "asio/streambuf.hpp" +#include "async_simple/coro/Lazy.h" +#include "cinatra/cinatra_log_wrapper.hpp" +#include "cinatra/response_cv.hpp" +#include "coro_http_request.hpp" +#include "coro_http_router.hpp" +#include "define.h" +#include "http_parser.hpp" +#include "string_resize.hpp" +#include "ylt/coro_io/coro_io.hpp" + +namespace cinatra { +struct chunked_result { + std::error_code ec; + bool eof = false; + std::string_view data; +}; + +class coro_http_connection + : public std::enable_shared_from_this { + public: + template + coro_http_connection(executor_t *executor, asio::ip::tcp::socket socket) + : executor_(executor), + socket_(std::move(socket)), + request_(parser_, this), + response_(this) { + buffers_.reserve(3); + } + + ~coro_http_connection() { close(); } + +#ifdef CINATRA_ENABLE_SSL + bool init_ssl(const std::string &cert_file, const std::string &key_file, + std::string passwd) { + unsigned long ssl_options = asio::ssl::context::default_workarounds | + asio::ssl::context::no_sslv2 | + asio::ssl::context::single_dh_use; + try { + ssl_ctx_ = + std::make_unique(asio::ssl::context::sslv23); + + ssl_ctx_->set_options(ssl_options); + if (!passwd.empty()) { + ssl_ctx_->set_password_callback([pwd = std::move(passwd)](auto, auto) { + return pwd; + }); + } + + std::error_code ec; + if (fs::exists(cert_file, ec)) { + ssl_ctx_->use_certificate_chain_file(std::move(cert_file)); + } + + if (fs::exists(key_file, ec)) { + ssl_ctx_->use_private_key_file(std::move(key_file), + asio::ssl::context::pem); + } + + ssl_stream_ = + std::make_unique>( + socket_, *ssl_ctx_); + use_ssl_ = true; + } catch (const std::exception &e) { + CINATRA_LOG_ERROR << "init ssl failed, reason: " << e.what(); + return false; + } + return true; + } +#endif + + async_simple::coro::Lazy start() { + bool has_shake = false; + while (true) { +#ifdef CINATRA_ENABLE_SSL + if (use_ssl_ && !has_shake) { + auto ec = co_await coro_io::async_handshake( + ssl_stream_, asio::ssl::stream_base::server); + if (ec) { + CINATRA_LOG_ERROR << "handle_shake error: " << ec.message(); + close(); + break; + } + + has_shake = true; + } +#endif + auto [ec, size] = co_await async_read_until(head_buf_, TWO_CRCF); + if (ec) { + CINATRA_LOG_ERROR << "read http header error: " << ec.message(); + close(); + break; + } + + const char *data_ptr = asio::buffer_cast(head_buf_.data()); + int head_len = parser_.parse_request(data_ptr, size, 0); + if (head_len <= 0) { + CINATRA_LOG_ERROR << "parse http header error"; + close(); + break; + } + + head_buf_.consume(size); + keep_alive_ = check_keep_alive(); + + bool is_chunked = parser_.is_chunked(); + + if (!is_chunked) { + size_t body_len = parser_.body_len(); + if (body_len <= head_buf_.size()) { + if (body_len > 0) { + detail::resize(body_, body_len); + auto data_ptr = asio::buffer_cast(head_buf_.data()); + memcpy(body_.data(), data_ptr, body_len); + head_buf_.consume(head_buf_.size()); + } + } + else { + size_t part_size = head_buf_.size(); + size_t size_to_read = body_len - part_size; + auto data_ptr = asio::buffer_cast(head_buf_.data()); + detail::resize(body_, body_len); + memcpy(body_.data(), data_ptr, part_size); + head_buf_.consume(part_size); + + auto [ec, size] = co_await async_read( + asio::buffer(body_.data() + part_size, size_to_read), + size_to_read); + if (ec) { + CINATRA_LOG_ERROR << "async_read error: " << ec.message(); + close(); + break; + } + } + } + + std::string_view key = { + parser_.method().data(), + parser_.method().length() + 1 + parser_.url().length()}; + + if (!body_.empty()) { + request_.set_body(body_); + } + + auto &router = coro_http_router::instance(); + if (auto handler = router.get_handler(key); handler) { + router.route(handler, request_, response_); + } + else { + if (auto coro_handler = router.get_coro_handler(key); coro_handler) { + co_await router.route_coro(coro_handler, request_, response_); + } + else { + // not found + response_.set_status(status_type::not_found); + } + } + + if (!response_.get_delay()) { + co_await reply(); + } + + response_.clear(); + buffers_.clear(); + body_.clear(); + } + } + + async_simple::coro::Lazy reply(bool need_to_bufffer = true) { + // avoid duplicate reply + if (need_to_bufffer) { + response_.to_buffers(buffers_); + } + auto [ec, _] = co_await async_write(buffers_); + if (ec) { + CINATRA_LOG_ERROR << "async_write error: " << ec.message(); + close(); + co_return false; + } + + if (!keep_alive_) { + // now in io thread, so can close socket immediately. + close(); + } + + co_return true; + } + + bool sync_reply() { return async_simple::coro::syncAwait(reply()); } + + async_simple::coro::Lazy begin_chunked() { + response_.set_delay(true); + response_.set_status(status_type::ok); + co_return co_await reply(); + } + + async_simple::coro::Lazy write_chunked(std::string_view chunked_data, + bool eof = false) { + response_.set_delay(true); + buffers_.clear(); + response_.to_chunked_buffers(buffers_, chunked_data, eof); + co_return co_await reply(false); + } + + async_simple::coro::Lazy end_chunked() { + co_return co_await write_chunked("", true); + } + + async_simple::coro::Lazy read_chunked() { + if (head_buf_.size() > 0) { + const char *data_ptr = asio::buffer_cast(head_buf_.data()); + chunked_buf_.sputn(data_ptr, head_buf_.size()); + head_buf_.consume(head_buf_.size()); + } + + chunked_result result{}; + std::error_code ec{}; + size_t size = 0; + + if (std::tie(ec, size) = co_await async_read_until(chunked_buf_, CRCF); + ec) { + result.ec = ec; + close(); + co_return result; + } + + size_t buf_size = chunked_buf_.size(); + size_t additional_size = buf_size - size; + const char *data_ptr = asio::buffer_cast(chunked_buf_.data()); + std::string_view size_str(data_ptr, size - CRCF.size()); + size_t chunk_size; + auto [ptr, err] = std::from_chars( + size_str.data(), size_str.data() + size_str.size(), chunk_size, 16); + if (err != std::errc{}) { + CINATRA_LOG_ERROR << "bad chunked size"; + result.ec = std::make_error_code(std::errc::invalid_argument); + co_return result; + } + + chunked_buf_.consume(size); + + if (chunk_size == 0) { + // all finished, no more data + chunked_buf_.consume(CRCF.size()); + result.eof = true; + co_return result; + } + + if (additional_size < size_t(chunk_size + 2)) { + // not a complete chunk, read left chunk data. + size_t size_to_read = chunk_size + 2 - additional_size; + if (std::tie(ec, size) = co_await async_read(chunked_buf_, size_to_read); + ec) { + result.ec = ec; + close(); + co_return result; + } + } + + data_ptr = asio::buffer_cast(chunked_buf_.data()); + result.data = std::string_view{data_ptr, (size_t)chunk_size}; + chunked_buf_.consume(chunk_size + CRCF.size()); + + co_return result; + } + + auto &tcp_socket() { return socket_; } + + void set_quit_callback(std::function callback, + uint64_t conn_id) { + quit_cb_ = std::move(callback); + conn_id_ = conn_id; + } + + template + async_simple::coro::Lazy> async_read( + AsioBuffer &&buffer, size_t size_to_read) noexcept { +#ifdef CINATRA_ENABLE_SSL + if (use_ssl_) { + return coro_io::async_read(*ssl_stream_, buffer, size_to_read); + } + else { +#endif + return coro_io::async_read(socket_, buffer, size_to_read); +#ifdef CINATRA_ENABLE_SSL + } +#endif + } + + template + async_simple::coro::Lazy> async_write( + AsioBuffer &&buffer) { +#ifdef CINATRA_ENABLE_SSL + if (use_ssl_) { + return coro_io::async_write(*ssl_stream_, buffer); + } + else { +#endif + return coro_io::async_write(socket_, buffer); +#ifdef CINATRA_ENABLE_SSL + } +#endif + } + + template + async_simple::coro::Lazy> async_read_until( + AsioBuffer &buffer, asio::string_view delim) noexcept { +#ifdef CINATRA_ENABLE_SSL + if (use_ssl_) { + return coro_io::async_read_until(*ssl_stream_, buffer, delim); + } + else { +#endif + return coro_io::async_read_until(socket_, buffer, delim); +#ifdef CINATRA_ENABLE_SSL + } +#endif + } + + auto &get_executor() { return *executor_; } + + void close() { + if (has_closed_) { + return; + } + + asio::dispatch(socket_.get_executor(), [this, self = shared_from_this()] { + std::error_code ec; + socket_.shutdown(asio::socket_base::shutdown_both, ec); + socket_.close(ec); + if (quit_cb_) { + quit_cb_(conn_id_); + } + has_closed_ = true; + }); + } + + private: + bool check_keep_alive() { + bool keep_alive = true; + auto val = request_.get_header_value("connection"); + if (!val.empty() && iequal0(val, "close")) { + keep_alive = false; + } + + return keep_alive; + } + + private: + async_simple::Executor *executor_; + asio::ip::tcp::socket socket_; + asio::streambuf head_buf_; + std::string body_; + asio::streambuf chunked_buf_; + http_parser parser_; + bool keep_alive_; + coro_http_request request_; + coro_http_response response_; + std::vector buffers_; + std::atomic has_closed_{false}; + uint64_t conn_id_{0}; + std::function quit_cb_ = nullptr; + +#ifdef CINATRA_ENABLE_SSL + std::unique_ptr ssl_ctx_ = nullptr; + std::unique_ptr> ssl_stream_; + bool use_ssl_ = false; +#endif +}; +} // namespace cinatra \ No newline at end of file diff --git a/include/ylt/thirdparty/cinatra/coro_http_request.hpp b/include/ylt/thirdparty/cinatra/coro_http_request.hpp new file mode 100644 index 000000000..d73fe5f0e --- /dev/null +++ b/include/ylt/thirdparty/cinatra/coro_http_request.hpp @@ -0,0 +1,94 @@ +#pragma once +#include "async_simple/coro/Lazy.h" +#include "define.h" +#include "http_parser.hpp" + +namespace cinatra { +class coro_http_connection; +class coro_http_request { + public: + coro_http_request(http_parser& parser, coro_http_connection* conn) + : parser_(parser), conn_(conn) {} + + std::string_view get_header_value(std::string_view key) { + auto headers = parser_.get_headers(); + for (auto& header : headers) { + if (iequal0(header.name, key)) { + return header.value; + } + } + + return {}; + } + + std::string_view get_query_value(std::string_view key) { + return parser_.get_query_value(key); + } + + std::string get_decode_query_value(std::string_view key) { + auto value = parser_.get_query_value(key); + if (value.empty()) { + return ""; + } + + return code_utils::get_string_by_urldecode(value); + } + + std::span get_headers() const { return parser_.get_headers(); } + + const auto& get_queries() const { return parser_.queries(); } + + void set_body(std::string& body) { + body_ = body; + auto type = get_content_type(); + if (type == content_type::urlencoded) { + parser_.parse_query(body_); + } + } + + std::string_view get_body() const { return body_; } + + bool is_chunked() { + static bool thread_local is_chunk = parser_.is_chunked(); + return is_chunk; + } + + content_type get_content_type() { + static content_type thread_local content_type = get_content_type_impl(); + return content_type; + } + + content_type get_content_type_impl() { + if (is_chunked()) + return content_type::chunked; + + auto content_type = get_header_value("content-type"); + if (!content_type.empty()) { + if (content_type.find("application/x-www-form-urlencoded") != + std::string_view::npos) { + return content_type::urlencoded; + } + else if (content_type.find("multipart/form-data") != + std::string_view::npos) { + return content_type::multipart; + } + else if (content_type.find("application/octet-stream") != + std::string_view::npos) { + return content_type::octet_stream; + } + else { + return content_type::string; + } + } + + return content_type::unknown; + } + + coro_http_connection* get_conn() { return conn_; } + + private: + http_parser& parser_; + std::string_view body_; + coro_http_connection* conn_; +}; +} // namespace cinatra \ No newline at end of file diff --git a/include/ylt/thirdparty/cinatra/coro_http_response.hpp b/include/ylt/thirdparty/cinatra/coro_http_response.hpp new file mode 100644 index 000000000..eb720765d --- /dev/null +++ b/include/ylt/thirdparty/cinatra/coro_http_response.hpp @@ -0,0 +1,177 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/SyncAwait.h" +#include "define.h" +#include "response_cv.hpp" +#include "time_util.hpp" + +namespace cinatra { +struct resp_header { + std::string key; + std::string value; +}; + +struct resp_header_sv { + std::string_view key; + std::string_view value; +}; + +enum class format_type { + normal, + chunked, +}; + +class coro_http_connection; +class coro_http_response { + public: + coro_http_response(coro_http_connection* conn) + : status_(status_type::not_implemented), + fmt_type_(format_type::normal), + delay_(false), + conn_(conn) { + head_.reserve(128); + } + + void set_status(cinatra::status_type status) { status_ = status; } + void set_content(std::string content) { content_ = std::move(content); } + void set_status_and_content(status_type status, std::string content) { + status_ = status; + content_ = std::move(content); + } + void set_delay(bool r) { delay_ = r; } + bool get_delay() const { return delay_; } + void set_format_type(format_type type) { fmt_type_ = type; } + + void add_header(auto k, auto v) { + resp_headers_.emplace_back(resp_header{std::move(k), std::move(v)}); + } + + void set_keepalive(bool r) { keepalive_ = r; } + + void to_buffers(std::vector& buffers) { + build_resp_head(); + + buffers.push_back(asio::buffer(to_rep_string(status_))); + buffers.push_back(asio::buffer(head_)); + if (!content_.empty()) { + if (fmt_type_ == format_type::chunked) { + to_chunked_buffers(buffers, content_, true); + } + else { + buffers.push_back(asio::buffer(content_)); + } + } + } + + std::string_view to_hex_string(size_t val) { + static char buf[20]; + auto [ptr, ec] = std::to_chars(std::begin(buf), std::end(buf), val, 16); + return std::string_view{buf, size_t(std::distance(buf, ptr))}; + } + + void to_chunked_buffers(std::vector& buffers, + std::string_view chunk_data, bool eof) { + if (!chunk_data.empty()) { + // convert bytes transferred count to a hex string. + auto chunk_size = to_hex_string(chunk_data.size()); + + // Construct chunk based on rfc2616 section 3.6.1 + buffers.push_back(asio::buffer(chunk_size)); + buffers.push_back(asio::buffer(crlf)); + buffers.push_back(asio::buffer(chunk_data)); + buffers.push_back(asio::buffer(crlf)); + } + + // append last-chunk + if (eof) { + buffers.push_back(asio::buffer(last_chunk)); + buffers.push_back(asio::buffer(crlf)); + } + } + + void build_resp_head() { + if (std::find_if(resp_headers_.begin(), resp_headers_.end(), + [](resp_header& header) { + return header.key == "Host"; + }) == resp_headers_.end()) { + resp_headers_sv_.emplace_back(resp_header_sv{"Host", "cinatra"}); + } + + if (status_ >= status_type::not_found) { + content_.append(to_string(status_)); + } + + if (fmt_type_ == format_type::chunked) { + resp_headers_sv_.emplace_back( + resp_header_sv{"Transfer-Encoding", "chunked"}); + } + else { + if (!content_.empty()) { + auto [ptr, ec] = std::to_chars(buf_, buf_ + 32, content_.size()); + resp_headers_sv_.emplace_back( + resp_header_sv{"Content-Length", + std::string_view(buf_, std::distance(buf_, ptr))}); + } + else { + resp_headers_sv_.emplace_back(resp_header_sv{"Content-Length", "0"}); + } + } + + resp_headers_sv_.emplace_back(resp_header_sv{"Date", get_gmt_time_str()}); + + if (keepalive_.has_value()) { + bool keepalive = keepalive_.value(); + resp_headers_sv_.emplace_back( + resp_header_sv{"Connection", keepalive ? "keep-alive" : "close"}); + } + + append_head(resp_headers_); + append_head(resp_headers_sv_); + head_.append(CRCF); + } + + coro_http_connection* get_conn() { return conn_; } + + void clear() { + head_.clear(); + content_.clear(); + + resp_headers_.clear(); + resp_headers_sv_.clear(); + keepalive_ = {}; + delay_ = false; + status_ = status_type::init; + fmt_type_ = format_type::normal; + } + + void append_head(auto& headers) { + for (auto& [k, v] : headers) { + head_.append(k); + head_.append(":"); + head_.append(v); + head_.append(CRCF); + } + } + + private: + status_type status_; + format_type fmt_type_; + std::string head_; + std::string content_; + std::optional keepalive_; + bool delay_; + char buf_[32]; + std::vector resp_headers_; + std::vector resp_headers_sv_; + coro_http_connection* conn_; +}; +} // namespace cinatra \ No newline at end of file diff --git a/include/ylt/thirdparty/cinatra/coro_http_router.hpp b/include/ylt/thirdparty/cinatra/coro_http_router.hpp new file mode 100644 index 000000000..f6fa90ff6 --- /dev/null +++ b/include/ylt/thirdparty/cinatra/coro_http_router.hpp @@ -0,0 +1,126 @@ +#pragma once +#include + +#include +#include +#include +#include +#include +#include + +#include "cinatra/cinatra_log_wrapper.hpp" +#include "cinatra/coro_http_request.hpp" +#include "cinatra/function_traits.hpp" +#include "cinatra/response_cv.hpp" +#include "coro_http_response.hpp" + +namespace cinatra { +template