Skip to content

Commit

Permalink
[coro_http][fix][feature]coro_http (#661)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Apr 18, 2024
1 parent 7349bd0 commit 9641d2a
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 234 deletions.
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,23 +381,22 @@ int main() {
### websocket
```c++
async_simple::coro::Lazy<void> websocket(coro_http_client &client) {
client.on_ws_close([](std::string_view reason) {
std::cout << "web socket close " << reason << std::endl;
});
client.on_ws_msg([](resp_data data) {
std::cout << data.resp_body << std::endl;
});
// connect to your websocket server.
bool r = co_await client.async_connect("ws://example.com/ws");
if (!r) {
co_return;
}
co_await client.async_send_ws("hello websocket");
co_await client.async_send_ws("test again", /*need_mask = */ false);
co_await client.async_send_ws_close("ws close reason");
co_await client.write_websocket("hello websocket");
auto data = co_await client.read_websocket();
CHECK(data.resp_body == "hello websocket");
co_await client.write_websocket("test again");
data = co_await client.read_websocket();
CHECK(data.resp_body == "test again");
co_await client.write_websocket("ws close");
data = co_await client.read_websocket();
CHECK(data.net_err == asio::error::eof);
CHECK(data.resp_body == "ws close");
}
```

Expand Down
136 changes: 59 additions & 77 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,21 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
{
auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
if (u.is_websocket()) {
// build websocket http header
add_header("Upgrade", "websocket");
add_header("Connection", "Upgrade");
if (ws_sec_key_.empty()) {
ws_sec_key_ = "s//GYHa/XO7Hd2F2eOGfyA=="; // provide a random string.
}
add_header("Sec-WebSocket-Key", ws_sec_key_);
add_header("Sec-WebSocket-Version", "13");

req_context<> ctx{};
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));
co_return data;
}
data = co_await connect(u);
}
if (socket_->is_timeout_) {
Expand Down Expand Up @@ -319,50 +334,41 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

void set_ws_sec_key(std::string sec_key) { ws_sec_key_ = std::move(sec_key); }

async_simple::coro::Lazy<bool> async_ws_connect(std::string uri) {
resp_data data{};
auto [r, u] = handle_uri(data, uri);
if (!r) {
CINATRA_LOG_WARNING << "url error:";
co_return false;
}
async_simple::coro::Lazy<resp_data> read_websocket() {
co_return co_await async_read_ws();
}

req_context<> ctx{};
if (u.is_websocket()) {
// build websocket http header
add_header("Upgrade", "websocket");
add_header("Connection", "Upgrade");
if (ws_sec_key_.empty()) {
ws_sec_key_ = "s//GYHa/XO7Hd2F2eOGfyA=="; // provide a random string.
}
add_header("Sec-WebSocket-Key", ws_sec_key_);
add_header("Sec-WebSocket-Version", "13");
}
async_simple::coro::Lazy<resp_data> write_websocket(
const char *data, opcode op = opcode::text) {
std::string str(data);
co_return co_await write_websocket(str, op);
}

data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));
async_read_ws().start([](auto &&) {
});
co_return !data.net_err;
async_simple::coro::Lazy<resp_data> write_websocket(
const char *data, size_t size, opcode op = opcode::text) {
std::string str(data, size);
co_return co_await write_websocket(str, op);
}

async_simple::coro::Lazy<resp_data> async_send_ws(const char *data,
bool need_mask = true,
opcode op = opcode::text) {
async_simple::coro::Lazy<resp_data> write_websocket(
std::string_view data, opcode op = opcode::text) {
std::string str(data);
co_return co_await async_send_ws(std::span<char>(str), need_mask, op);
co_return co_await write_websocket(str, op);
}

async_simple::coro::Lazy<resp_data> async_send_ws(std::string data,
bool need_mask = true,
opcode op = opcode::text) {
co_return co_await async_send_ws(std::span<char>(data), need_mask, op);
async_simple::coro::Lazy<resp_data> write_websocket(
std::string &data, opcode op = opcode::text) {
co_return co_await write_websocket(std::span<char>(data), op);
}

async_simple::coro::Lazy<resp_data> write_websocket(
std::string &&data, opcode op = opcode::text) {
co_return co_await write_websocket(std::span<char>(data), op);
}

template <typename Source>
async_simple::coro::Lazy<resp_data> async_send_ws(Source source,
bool need_mask = true,
opcode op = opcode::text) {
async_simple::coro::Lazy<resp_data> write_websocket(
Source source, opcode op = opcode::text) {
resp_data data{};

websocket ws{};
Expand All @@ -376,7 +382,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}

if constexpr (is_span_v<Source>) {
std::string encode_header = ws.encode_frame(source, op, need_mask);
std::string encode_header = ws.encode_frame(source, op, true);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(source.data(), source.size())};
Expand All @@ -392,8 +398,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
auto result = co_await source();

std::span<char> msg(result.buf.data(), result.buf.size());
std::string encode_header =
ws.encode_frame(msg, op, need_mask, result.eof);
std::string encode_header = ws.encode_frame(msg, op, result.eof);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(msg.data(), msg.size())};
Expand All @@ -414,16 +419,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data;
}

async_simple::coro::Lazy<resp_data> async_send_ws_close(
async_simple::coro::Lazy<resp_data> write_websocket_close(
std::string msg = "") {
co_return co_await async_send_ws(std::move(msg), false, opcode::close);
}

void on_ws_msg(std::function<void(resp_data)> on_ws_msg) {
on_ws_msg_ = std::move(on_ws_msg);
}
void on_ws_close(std::function<void(std::string_view)> on_ws_close) {
on_ws_close_ = std::move(on_ws_close);
co_return co_await write_websocket(std::move(msg), opcode::close);
}

#ifdef BENCHMARK_TEST
Expand Down Expand Up @@ -1625,14 +1623,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
break;
}

if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(CRCF.size());
data.status = 200;
data.eof = true;
break;
}

if (additional_size < size_t(chunk_size + 2)) {
// not a complete chunk, read left chunk data.
size_t size_to_read = chunk_size + 2 - additional_size;
Expand All @@ -1643,6 +1633,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
}

if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(chunked_buf_.size());
data.status = 200;
data.eof = true;
break;
}

data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data());
if (ctx.stream) {
ec = co_await ctx.stream->async_write(data_ptr, chunk_size);
Expand Down Expand Up @@ -1782,15 +1780,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return resp_data{{}, 200};
}

// this function must be called before async_ws_connect.
async_simple::coro::Lazy<void> async_read_ws() {
async_simple::coro::Lazy<resp_data> async_read_ws() {
resp_data data{};

head_buf_.consume(head_buf_.size());
size_t header_size = 2;
std::shared_ptr sock = socket_;
auto on_ws_msg = on_ws_msg_;
auto on_ws_close = on_ws_close_;
asio::streambuf &read_buf = sock->head_buf_;
bool has_init_ssl = false;
#ifdef CINATRA_ENABLE_SSL
Expand All @@ -1805,14 +1800,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data.status = 404;

if (sock->has_closed_) {
co_return;
co_return data;
}

close_socket(*sock);

if (on_ws_msg)
on_ws_msg(data);
co_return;
co_return data;
}

const char *data_ptr = asio::buffer_cast<const char *>(read_buf.data());
Expand All @@ -1835,9 +1827,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data.net_err = ec;
data.status = 404;
close_socket(*sock);
if (on_ws_msg)
on_ws_msg(data);
co_return;
co_return data;
}
}

Expand All @@ -1856,14 +1846,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
header_size = 2;

if (is_close_frame) {
if (on_ws_close)
on_ws_close(data.resp_body);

std::string reason = "close";
auto close_str = ws.format_close_payload(close_code::normal,
reason.data(), reason.size());
auto span = std::span<char>(close_str);
std::string encode_header = ws.encode_frame(span, opcode::close, false);
std::string encode_header = ws.encode_frame(span, opcode::close, true);
std::vector<asio::const_buffer> buffers{asio::buffer(encode_header),
asio::buffer(reason)};

Expand All @@ -1873,12 +1860,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

data.net_err = asio::error::eof;
data.status = 404;
if (on_ws_msg)
on_ws_msg(data);
co_return;
co_return data;
}
if (on_ws_msg)
on_ws_msg(data);
co_return data;
}
}

Expand Down Expand Up @@ -2019,8 +2003,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::map<std::string, multipart_t> form_data_;
size_t max_single_part_size_ = 1024 * 1024;

std::function<void(resp_data)> on_ws_msg_;
std::function<void(std::string_view)> on_ws_close_;
std::string ws_sec_key_;
std::string host_;
std::string port_;
Expand Down
Loading

0 comments on commit 9641d2a

Please sign in to comment.