Skip to content

Commit

Permalink
update coro_http
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Jan 16, 2024
1 parent aa43023 commit b6d6fdd
Show file tree
Hide file tree
Showing 27 changed files with 2,910 additions and 1,539 deletions.
123 changes: 87 additions & 36 deletions include/ylt/thirdparty/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "async_simple/coro/Lazy.h"
#include "cinatra_log_wrapper.hpp"
#include "http_parser.hpp"
#include "multipart.hpp"
#include "picohttpparser.h"
#include "response_cv.hpp"
#include "string_resize.hpp"
Expand Down Expand Up @@ -147,7 +148,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
: executor_wrapper_(executor),
timer_(&executor_wrapper_),
socket_(std::make_shared<socket_t>(executor)),
read_buf_(socket_->read_buf_),
head_buf_(socket_->head_buf_),
chunked_buf_(socket_->chunked_buf_) {}

coro_http_client(
Expand Down Expand Up @@ -185,9 +186,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
return true;
}

~coro_http_client() { async_close(); }
~coro_http_client() { close(); }

void async_close() {
void close() {
if (socket_ == nullptr || socket_->has_closed_)
return;

Expand Down Expand Up @@ -505,7 +506,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data;
}

std::tie(ec, size) = co_await async_read(read_buf_, total_len_);
std::tie(ec, size) = co_await async_read(head_buf_, total_len_);

if (ec) {
if (!stop_bench_)
Expand All @@ -517,16 +518,16 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
else {
const char *data_ptr =
asio::buffer_cast<const char *>(read_buf_.data());
read_buf_.consume(total_len_);
asio::buffer_cast<const char *>(head_buf_.data());
head_buf_.consume(total_len_);
// check status
if (data_ptr[9] > '3') {
data.status = 404;
co_return data;
}
}

read_buf_.consume(total_len_);
head_buf_.consume(total_len_);
data.status = 200;
data.total = total_len_;

Expand Down Expand Up @@ -891,14 +892,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

std::string file_data;
detail::resize(file_data, max_single_part_size_);
std::string chunk_size_str;

if constexpr (is_stream_file) {
while (!source->eof()) {
size_t rd_size =
source->read(file_data.data(), file_data.size()).gcount();
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
file_data.data(), rd_size, chunk_size_str, source->eof());
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(bufs, {file_data.data(), rd_size},
source->eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
Expand All @@ -915,19 +916,20 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
while (!file.eof()) {
auto [rd_ec, rd_size] =
co_await file.async_read(file_data.data(), file_data.size());
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
file_data.data(), rd_size, chunk_size_str, file.eof());
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(bufs, {file_data.data(), rd_size},
file.eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
}
}
else {
std::string chunk_size_str;
while (true) {
auto result = co_await source();
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
result.buf.data(), result.buf.size(), chunk_size_str, result.eof);
std::vector<asio::const_buffer> bufs;
cinatra::to_chunked_buffers(
bufs, {result.buf.data(), result.buf.size()}, result.eof);
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
break;
}
Expand Down Expand Up @@ -1168,7 +1170,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
struct socket_t {
asio::ip::tcp::socket impl_;
std::atomic<bool> has_closed_ = true;
asio::streambuf read_buf_;
asio::streambuf head_buf_;
asio::streambuf chunked_buf_;
#ifdef CINATRA_ENABLE_SSL
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_;
Expand All @@ -1184,6 +1186,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::pair<bool, uri_t> handle_uri(resp_data &data, const S &uri) {
uri_t u;
if (!u.parse_from(uri.data())) {
CINATRA_LOG_WARNING
<< uri
<< ", the url is not right, maybe need to encode the url firstly";
data.net_err = std::make_error_code(std::errc::protocol_error);
data.status = 404;
return {false, {}};
Expand Down Expand Up @@ -1320,7 +1325,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::error_code handle_header(resp_data &data, http_parser &parser,
size_t header_size) {
// parse header
const char *data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());

int parse_ret = parser.parse_response(data_ptr, header_size, 0);
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
Expand All @@ -1334,7 +1339,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif
return std::make_error_code(std::errc::protocol_error);
}
read_buf_.consume(header_size); // header size
head_buf_.consume(header_size); // header size
data.resp_headers = parser.get_headers();
data.status = parser.status();
return {};
Expand All @@ -1348,7 +1353,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
http_method method) {
resp_data data{};
do {
if (std::tie(ec, size) = co_await async_read_until(read_buf_, TWO_CRCF);
if (std::tie(ec, size) = co_await async_read_until(head_buf_, TWO_CRCF);
ec) {
break;
}
Expand Down Expand Up @@ -1377,16 +1382,28 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
if (parser_.is_chunked()) {
is_keep_alive = true;
if (read_buf_.size() > 0) {
if (head_buf_.size() > 0) {
const char *data_ptr =
asio::buffer_cast<const char *>(read_buf_.data());
chunked_buf_.sputn(data_ptr, read_buf_.size());
read_buf_.consume(read_buf_.size());
asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}
ec = co_await handle_chunked(data, std::move(ctx));
break;
}

if (parser_.is_multipart()) {
is_keep_alive = true;
if (head_buf_.size() > 0) {
const char *data_ptr =
asio::buffer_cast<const char *>(head_buf_.data());
chunked_buf_.sputn(data_ptr, head_buf_.size());
head_buf_.consume(head_buf_.size());
}
ec = co_await handle_multipart(data, std::move(ctx));
break;
}

redirect_uri_.clear();
bool is_redirect = parser_.is_location();
if (is_redirect)
Expand All @@ -1406,29 +1423,29 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}

if (content_len <= read_buf_.size()) {
if (content_len <= head_buf_.size()) {
// Now get entire content, additional data will discard.
// copy body.
if (content_len > 0) {
auto data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
if (is_out_buf) {
memcpy(out_buf_.data(), data_ptr, content_len);
}
else {
detail::resize(body_, content_len);
memcpy(body_.data(), data_ptr, content_len);
}
read_buf_.consume(read_buf_.size());
head_buf_.consume(head_buf_.size());
}
co_await handle_entire_content(data, content_len, is_ranges, ctx);
break;
}

// read left part of content.
size_t part_size = read_buf_.size();
size_t part_size = head_buf_.size();
size_t size_to_read = content_len - part_size;

auto data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
auto data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
if (is_out_buf) {
memcpy(out_buf_.data(), data_ptr, part_size);
}
Expand All @@ -1437,7 +1454,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
memcpy(body_.data(), data_ptr, part_size);
}

read_buf_.consume(part_size);
head_buf_.consume(part_size);

if (is_out_buf) {
if (std::tie(ec, size) = co_await async_read(
Expand Down Expand Up @@ -1474,7 +1491,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
auto &ctx) {
if (content_len > 0) {
const char *data_ptr;
if (read_buf_.size() == 0) {
if (head_buf_.size() == 0) {
if (out_buf_.empty()) {
data_ptr = body_.data();
}
Expand All @@ -1483,7 +1500,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}
else {
data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
}

if (is_ranges) {
Expand All @@ -1499,9 +1516,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string_view reply(data_ptr, content_len);
data.resp_body = reply;

read_buf_.consume(content_len);
head_buf_.consume(content_len);
}
data.eof = (read_buf_.size() == 0);
data.eof = (head_buf_.size() == 0);
}

void handle_result(resp_data &data, std::error_code ec, bool is_keep_alive) {
Expand All @@ -1522,6 +1539,39 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}

template <typename String>
async_simple::coro::Lazy<std::error_code> handle_multipart(
resp_data &data, req_context<String> ctx) {
std::error_code ec{};
std::string boundary = std::string{parser_.get_boundary()};
multipart_reader_t multipart(this);
while (true) {
auto part_head = co_await multipart.read_part_head();
if (part_head.ec) {
co_return part_head.ec;
}

auto part_body = co_await multipart.read_part_body(boundary);

if (ctx.stream) {
ec = co_await ctx.stream->async_write(part_body.data.data(),
part_body.data.size());
}
else {
resp_chunk_str_.append(part_body.data.data(), part_body.data.size());
}

if (part_body.ec) {
co_return part_body.ec;
}

if (part_body.eof) {
break;
}
}
co_return ec;
}

template <typename String>
async_simple::coro::Lazy<std::error_code> handle_chunked(
resp_data &data, req_context<String> ctx) {
Expand Down Expand Up @@ -1721,12 +1771,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
async_simple::coro::Lazy<void> async_read_ws() {
resp_data data{};

read_buf_.consume(read_buf_.size());
head_buf_.consume(head_buf_.size());
size_t header_size = 2;
std::shared_ptr sock = socket_;
auto on_ws_msg = std::move(on_ws_msg_);
auto on_ws_close = std::move(on_ws_close_);
asio::streambuf &read_buf = sock->read_buf_;
asio::streambuf &read_buf = sock->head_buf_;
bool has_init_ssl = false;
#ifdef CINATRA_ENABLE_SSL
has_init_ssl = has_init_ssl_;
Expand Down Expand Up @@ -1927,11 +1977,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
return has_http_scheme;
}

friend class multipart_reader_t<coro_http_client>;
http_parser parser_;
coro_io::ExecutorWrapper<> executor_wrapper_;
coro_io::period_timer timer_;
std::shared_ptr<socket_t> socket_;
asio::streambuf &read_buf_;
asio::streambuf &head_buf_;
asio::streambuf &chunked_buf_;
std::string body_;

Expand Down
Loading

0 comments on commit b6d6fdd

Please sign in to comment.