Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_htttp][feat]update coro_http #567

Merged
merged 2 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 87 additions & 36 deletions include/ylt/thirdparty/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "async_simple/coro/Lazy.h"
#include "cinatra_log_wrapper.hpp"
#include "http_parser.hpp"
#include "multipart.hpp"
#include "picohttpparser.h"
#include "response_cv.hpp"
#include "string_resize.hpp"
Expand Down Expand Up @@ -147,7 +148,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
: executor_wrapper_(executor),
timer_(&executor_wrapper_),
socket_(std::make_shared<socket_t>(executor)),
read_buf_(socket_->read_buf_),
head_buf_(socket_->head_buf_),
chunked_buf_(socket_->chunked_buf_) {}

coro_http_client(
Expand Down Expand Up @@ -185,9 +186,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
return true;
}

~coro_http_client() { async_close(); }
~coro_http_client() { close(); }

void async_close() {
void close() {
if (socket_ == nullptr || socket_->has_closed_)
return;

Expand Down Expand Up @@ -505,7 +506,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data;
}

std::tie(ec, size) = co_await async_read(read_buf_, total_len_);
std::tie(ec, size) = co_await async_read(head_buf_, total_len_);

if (ec) {
if (!stop_bench_)
Expand All @@ -517,16 +518,16 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
else {
const char *data_ptr =
asio::buffer_cast<const char *>(read_buf_.data());
read_buf_.consume(total_len_);
asio::buffer_cast<const char *>(head_buf_.data());
head_buf_.consume(total_len_);
// check status
if (data_ptr[9] > '3') {
data.status = 404;
co_return data;
}
}

read_buf_.consume(total_len_);
head_buf_.consume(total_len_);
data.status = 200;
data.total = total_len_;

Expand Down Expand Up @@ -891,14 +892,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

std::string file_data;
detail::resize(file_data, max_single_part_size_);
std::string chunk_size_str;

if constexpr (is_stream_file) {
while (!source->eof()) {
size_t rd_size =
source->read(file_data.data(), file_data.size()).gcount();
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
file_data.data(), rd_size, chunk_size_str, source->eof());
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(bufs, {file_data.data(), rd_size},
source->eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
Expand All @@ -915,19 +916,20 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
while (!file.eof()) {
auto [rd_ec, rd_size] =
co_await file.async_read(file_data.data(), file_data.size());
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
file_data.data(), rd_size, chunk_size_str, file.eof());
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(bufs, {file_data.data(), rd_size},
file.eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
}
}
else {
std::string chunk_size_str;
while (true) {
auto result = co_await source();
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
result.buf.data(), result.buf.size(), chunk_size_str, result.eof);
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(
bufs, {result.buf.data(), result.buf.size()}, result.eof);
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
Expand Down Expand Up @@ -1168,7 +1170,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
struct socket_t {
asio::ip::tcp::socket impl_;
std::atomic<bool> has_closed_ = true;
asio::streambuf read_buf_;
asio::streambuf head_buf_;
asio::streambuf chunked_buf_;
#ifdef CINATRA_ENABLE_SSL
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_;
Expand All @@ -1184,6 +1186,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::pair<bool, uri_t> handle_uri(resp_data &data, const S &uri) {
uri_t u;
if (!u.parse_from(uri.data())) {
CINATRA_LOG_WARNING
<< uri
<< ", the url is not right, maybe need to encode the url firstly";
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
return {false, {}};
Expand Down Expand Up @@ -1320,7 +1325,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::error_code handle_header(resp_data &data, http_parser &parser,
size_t header_size) {
// parse header
const char *data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());

int parse_ret = parser.parse_response(data_ptr, header_size, 0);
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
Expand All @@ -1334,7 +1339,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif
return std::make_error_code(std::errc::protocol_error);
}
read_buf_.consume(header_size); // header size
head_buf_.consume(header_size); // header size
data.resp_headers = parser.get_headers();
data.status = parser.status();
return {};
Expand All @@ -1348,7 +1353,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
http_method method) {
resp_data data{};
do {
if (std::tie(ec, size) = co_await async_read_until(read_buf_, TWO_CRCF);
if (std::tie(ec, size) = co_await async_read_until(head_buf_, TWO_CRCF);
ec) {
break;
}
Expand Down Expand Up @@ -1377,16 +1382,28 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
if (parser_.is_chunked()) {
is_keep_alive = true;
if (read_buf_.size() > 0) {
if (head_buf_.size() > 0) {
const char *data_ptr =
asio::buffer_cast<const char *>(read_buf_.data());
chunked_buf_.sputn(data_ptr, read_buf_.size());
read_buf_.consume(read_buf_.size());
asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}
ec = co_await handle_chunked(data, std::move(ctx));
break;
}

if (parser_.is_multipart()) {
is_keep_alive = true;
if (head_buf_.size() > 0) {
const char *data_ptr =
asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}
ec = co_await handle_multipart(data, std::move(ctx));
break;
}

redirect_uri_.clear();
bool is_redirect = parser_.is_location();
if (is_redirect)
Expand All @@ -1406,29 +1423,29 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}

if (content_len <= read_buf_.size()) {
if (content_len <= head_buf_.size()) {
// Now get entire content, additional data will discard.
// copy body.
if (content_len > 0) {
auto data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
if (is_out_buf) {
memcpy(out_buf_.data(), data_ptr, content_len);
}
else {
detail::resize(body_, content_len);
memcpy(body_.data(), data_ptr, content_len);
}
read_buf_.consume(read_buf_.size());
head_buf_.consume(head_buf_.size());
}
co_await handle_entire_content(data, content_len, is_ranges, ctx);
break;
}

// read left part of content.
size_t part_size = read_buf_.size();
size_t part_size = head_buf_.size();
size_t size_to_read = content_len - part_size;

auto data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
if (is_out_buf) {
memcpy(out_buf_.data(), data_ptr, part_size);
}
Expand All @@ -1437,7 +1454,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
memcpy(body_.data(), data_ptr, part_size);
}

read_buf_.consume(part_size);
head_buf_.consume(part_size);

if (is_out_buf) {
if (std::tie(ec, size) = co_await async_read(
Expand Down Expand Up @@ -1474,7 +1491,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
auto &ctx) {
if (content_len > 0) {
const char *data_ptr;
if (read_buf_.size() == 0) {
if (head_buf_.size() == 0) {
if (out_buf_.empty()) {
data_ptr = body_.data();
}
Expand All @@ -1483,7 +1500,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
else {
data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
}

if (is_ranges) {
Expand All @@ -1499,9 +1516,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string_view reply(data_ptr, content_len);
data.resp_body = reply;

read_buf_.consume(content_len);
head_buf_.consume(content_len);
}
data.eof = (read_buf_.size() == 0);
data.eof = (head_buf_.size() == 0);
}

void handle_result(resp_data &data, std::error_code ec, bool is_keep_alive) {
Expand All @@ -1522,6 +1539,39 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}

template <typename String>
async_simple::coro::Lazy<std::error_code> handle_multipart(
resp_data &data, req_context<String> ctx) {
std::error_code ec{};
std::string boundary = std::string{parser_.get_boundary()};
multipart_reader_t multipart(this);
while (true) {
auto part_head = co_await multipart.read_part_head();
if (part_head.ec) {
co_return part_head.ec;
}

auto part_body = co_await multipart.read_part_body(boundary);

if (ctx.stream) {
ec = co_await ctx.stream->async_write(part_body.data.data(),
part_body.data.size());
}
else {
resp_chunk_str_.append(part_body.data.data(), part_body.data.size());
}

if (part_body.ec) {
co_return part_body.ec;
}

if (part_body.eof) {
break;
}
}
co_return ec;
}

template <typename String>
async_simple::coro::Lazy<std::error_code> handle_chunked(
resp_data &data, req_context<String> ctx) {
Expand Down Expand Up @@ -1721,12 +1771,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
async_simple::coro::Lazy<void> async_read_ws() {
resp_data data{};

read_buf_.consume(read_buf_.size());
head_buf_.consume(head_buf_.size());
size_t header_size = 2;
std::shared_ptr sock = socket_;
auto on_ws_msg = std::move(on_ws_msg_);
auto on_ws_close = std::move(on_ws_close_);
asio::streambuf &read_buf = sock->read_buf_;
asio::streambuf &read_buf = sock->head_buf_;
bool has_init_ssl = false;
#ifdef CINATRA_ENABLE_SSL
has_init_ssl = has_init_ssl_;
Expand Down Expand Up @@ -1927,11 +1977,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
return has_http_scheme;
}

friend class multipart_reader_t<coro_http_client>;
http_parser parser_;
coro_io::ExecutorWrapper<> executor_wrapper_;
coro_io::period_timer timer_;
std::shared_ptr<socket_t> socket_;
asio::streambuf &read_buf_;
asio::streambuf &head_buf_;
asio::streambuf &chunked_buf_;
std::string body_;

Expand Down
Loading
Loading