diff --git a/include/cinatra/coro_http_client.hpp b/include/cinatra/coro_http_client.hpp index 9ccff808..4dfca64a 100644 --- a/include/cinatra/coro_http_client.hpp +++ b/include/cinatra/coro_http_client.hpp @@ -348,6 +348,92 @@ class coro_http_client : public std::enable_shared_from_this { co_return data; } + async_simple::coro::Lazy async_send_ws_multiple( + std::string msg, bool need_mask = true, opcode op = opcode::text) { + resp_data data{}; + size_t msg_length = msg.length(); + int msg_offset = 0; + + websocket ws{}; + if (op == opcode::close) { + msg = ws.format_close_payload(close_code::normal, msg.data(), msg.size()); + data = co_await async_send_ws(msg, need_mask, op); + co_return data; + } + + while (msg_length > 0) { + if (msg_length > websocket_split_part_size_) { + std::string part(msg.data() + msg_offset, websocket_split_part_size_); + data = co_await async_send_ws(part, need_mask, op); + if (data.net_err) { + co_return data; + } + msg_offset += websocket_split_part_size_; + msg_length -= websocket_split_part_size_; + } + else { + std::string part(msg.data() + msg_offset, msg_length); + msg_length = 0; + data = co_await async_send_ws(part, need_mask, op); + if (data.net_err) { + co_return data; + } + } + } + data.eof = true; + co_return data; + } + + async_simple::coro::Lazy async_send_ws_chuncked( + std::string msg, uint64_t max_part_size, bool need_mask = true, + opcode op = opcode::text) { + resp_data data{}; + + websocket ws{}; + if (op == opcode::close) { + msg = ws.format_close_payload(close_code::normal, msg.data(), msg.size()); + } + + std::string total_data = ws.encode_frame(msg, op, need_mask); + bool is_first_part = true; + total_data.append(msg); + size_t msg_length = total_data.length(); + int msg_offset = 0; + + while (msg_length > 0) { + std::cout << msg_length << std::endl; + if (msg_length > max_part_size) { + std::string part(total_data.data() + msg_offset, max_part_size); + std::vector buffers{ + asio::buffer(part.data(), max_part_size)}; + auto [ec, _] = co_await async_write(buffers); + if (ec) { + data.net_err = ec; + data.status = 404; + co_return data; + } + msg_length -= max_part_size; + msg_offset += max_part_size; + } + else { + std::string last_part(total_data.data() + msg_offset, msg_length); + std::vector buffers{ + asio::buffer(last_part.data(), last_part.size())}; + auto [ec, _] = co_await async_write(buffers); + if (ec) { + data.net_err = ec; + data.status = 404; + co_return data; + } + + msg_offset += msg_length; + msg_length = 0; + } + } + data.eof = true; + co_return data; + } + async_simple::coro::Lazy async_send_ws_close( std::string msg = "") { return async_send_ws(std::move(msg), false, opcode::close); @@ -1781,6 +1867,7 @@ class coro_http_client : public std::enable_shared_from_this { bool enable_tcp_no_delay_ = false; std::string resp_chunk_str_; std::span out_buf_; + static constexpr uint64_t websocket_split_part_size_ = 3145728; #ifdef BENCHMARK_TEST std::string req_str_; diff --git a/lang/coro_http_client_introduction.md b/lang/coro_http_client_introduction.md index fb779cc4..f2e5512b 100644 --- a/lang/coro_http_client_introduction.md +++ b/lang/coro_http_client_introduction.md @@ -465,6 +465,28 @@ enum opcode : std::uint8_t { async_simple::coro::Lazy async_send_ws(std::string msg, bool need_mask = true, opcode op = opcode::text); + +/// 后两个api是为了解决待发送的websocket数据过大的问题, +/// websocket数据过大情况下一次性传入所有buffer到asio::async_write会出错 + +/// 多批次发送websocket 数据, +/// 内部由websocket_split_part_size_变量限制为3M。超过3M分割为多个websocket请求发送 +/// \param msg 要发送的websocket 数据 +/// \param need_mask 是否需要对数据进行mask,默认会mask +/// \param op opcode 一般为text、binary或 close 等类型 +async_simple::coro::Lazy async_send_ws_multiple(std::string msg, + bool need_mask = true, + opcode op = opcode::text); + +/// 分段形式发送websocket 数据 +/// \param msg 要发送的websocket 数据 +/// \param max_part_size 每段发送的数据最大大小 +/// \param need_mask 是否需要对数据进行mask,默认会mask +/// \param op opcode 一般为text、binary或 close 等类型 +async_simple::coro::Lazy async_send_ws_chuncked(std::string msg, + uint64_t max_part_size, + bool need_mask = true, + opcode op = opcode::text); ``` websocket 例子: