Skip to content

Commit

Permalink
feat: large file upload with websocket-client
Browse files Browse the repository at this point in the history
Add two APIs:
One for websocket chunked transfer,
Another one for splitting large files into multiple websocket requests for transfer.
  • Loading branch information
helintongh committed Nov 1, 2023
1 parent 4c98676 commit 18088c3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
86 changes: 86 additions & 0 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,92 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data;
}

async_simple::coro::Lazy<resp_data> async_send_ws_multiple(
std::string msg, bool need_mask = true, opcode op = opcode::text) {
resp_data data{};
size_t msg_length = msg.length();
int msg_offset = 0;

websocket ws{};
if (op == opcode::close) {
msg = ws.format_close_payload(close_code::normal, msg.data(), msg.size());
data = co_await async_send_ws(msg, need_mask, op);
co_return data;
}

while (msg_length > 0) {
if (msg_length > websocket_split_part_size_) {
std::string part(msg.data() + msg_offset, websocket_split_part_size_);
data = co_await async_send_ws(part, need_mask, op);
if (data.net_err) {
co_return data;
}
msg_offset += websocket_split_part_size_;
msg_length -= websocket_split_part_size_;
}
else {
std::string part(msg.data() + msg_offset, msg_length);
msg_length = 0;
data = co_await async_send_ws(part, need_mask, op);
if (data.net_err) {
co_return data;
}
}
}
data.eof = true;
co_return data;
}

async_simple::coro::Lazy<resp_data> async_send_ws_chuncked(
std::string msg, uint64_t max_part_size, bool need_mask = true,
opcode op = opcode::text) {
resp_data data{};

websocket ws{};
if (op == opcode::close) {
msg = ws.format_close_payload(close_code::normal, msg.data(), msg.size());
}

std::string total_data = ws.encode_frame(msg, op, need_mask);
bool is_first_part = true;
total_data.append(msg);
size_t msg_length = total_data.length();
int msg_offset = 0;

while (msg_length > 0) {
std::cout << msg_length << std::endl;
if (msg_length > max_part_size) {
std::string part(total_data.data() + msg_offset, max_part_size);
std::vector<asio::const_buffer> buffers{
asio::buffer(part.data(), max_part_size)};
auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
co_return data;
}
msg_length -= max_part_size;
msg_offset += max_part_size;
}
else {
std::string last_part(total_data.data() + msg_offset, msg_length);
std::vector<asio::const_buffer> buffers{
asio::buffer(last_part.data(), last_part.size())};
auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
co_return data;
}

msg_offset += msg_length;
msg_length = 0;
}
}
data.eof = true;
co_return data;
}

async_simple::coro::Lazy<resp_data> async_send_ws_close(
std::string msg = "") {
return async_send_ws(std::move(msg), false, opcode::close);
Expand Down
22 changes: 22 additions & 0 deletions lang/coro_http_client_introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,28 @@ enum opcode : std::uint8_t {
async_simple::coro::Lazy<resp_data> async_send_ws(std::string msg,
bool need_mask = true,
opcode op = opcode::text);

/// 后两个api是为了解决待发送的websocket数据过大的问题,
/// websocket数据过大情况下一次性传入所有buffer到asio::async_write会出错

/// 多批次发送websocket 数据,
/// 内部由websocket_split_part_size_变量限制为3M。超过3M分割为多个websocket请求发送
/// \param msg 要发送的websocket 数据
/// \param need_mask 是否需要对数据进行mask,默认会mask
/// \param op opcode 一般为text、binary或 close 等类型
async_simple::coro::Lazy<resp_data> async_send_ws_multiple(std::string msg,
bool need_mask = true,
opcode op = opcode::text);

/// 分段形式发送websocket 数据
/// \param msg 要发送的websocket 数据
/// \param max_part_size 每段发送的数据最大大小
/// \param need_mask 是否需要对数据进行mask,默认会mask
/// \param op opcode 一般为text、binary或 close 等类型
async_simple::coro::Lazy<resp_data> async_send_ws_chuncked(std::string msg,
uint64_t max_part_size,
bool need_mask = true,
opcode op = opcode::text);
```
websocket 例子:
Expand Down

0 comments on commit 18088c3

Please sign in to comment.