Skip to content

Commit

Permalink
support chunked ws (qicosmos#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Nov 24, 2023
1 parent 2dd31af commit f096469
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 58 deletions.
1 change: 1 addition & 0 deletions .github/workflows/mac.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:

- name: Upload Artifact
uses: actions/upload-artifact@v3
if: failure()
with:
name: LastTest.log
if-no-files-found: ignore
Expand Down
1 change: 1 addition & 0 deletions include/cinatra.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define CINATRA_CINATRA_HPP

#include "cinatra/coro_http_client.hpp"
#include "cinatra/coro_http_server.hpp"
#include "cinatra/http_server.hpp"
#include "cinatra/smtp_client.hpp"

Expand Down
2 changes: 1 addition & 1 deletion include/cinatra/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ class connection : public base_connection,
case cinatra::ws_frame_type::WS_CLOSE_FRAME: {
close_frame close_frame =
ws_.parse_close_payload(payload.data(), payload.length());
size_t len = std::min<size_t>(MAX_CLOSE_PAYLOAD, payload.length());
size_t len = std::min<size_t>(MAX_CLOSE_PAYLOAD, close_frame.length);
req_.set_part_data({close_frame.message, len});
req_.call_event(data_proc_state::data_close);

Expand Down
81 changes: 68 additions & 13 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ struct is_stream<
template <class T>
constexpr bool is_stream_v = is_stream<T>::value;

template <class, class = void>
struct is_span : std::false_type {};

template <class T>
struct is_span<T, std::void_t<decltype(std::declval<T>().data(),
std::declval<T>().size())>>
: std::true_type {};

template <class T>
constexpr bool is_span_v = is_span<T>::value;

template <class, class = void>
struct is_smart_ptr : std::false_type {};

Expand Down Expand Up @@ -105,7 +116,7 @@ struct multipart_t {
};

struct read_result {
std::string_view buf;
std::span<char> buf;
bool eof;
std::error_code err;
};
Expand Down Expand Up @@ -330,33 +341,77 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return !data.net_err;
}

async_simple::coro::Lazy<resp_data> async_send_ws(std::string msg,
async_simple::coro::Lazy<resp_data> async_send_ws(const char *data,
bool need_mask = true,
opcode op = opcode::text) {
std::string str(data);
co_return co_await async_send_ws(std::span<char>(str), need_mask, op);
}

async_simple::coro::Lazy<resp_data> async_send_ws(std::string data,
bool need_mask = true,
opcode op = opcode::text) {
co_return co_await async_send_ws(std::span<char>(data), need_mask, op);
}

template <typename Source>
async_simple::coro::Lazy<resp_data> async_send_ws(Source source,
bool need_mask = true,
opcode op = opcode::text) {
resp_data data{};

websocket ws{};
std::string close_str;
if (op == opcode::close) {
msg = ws.format_close_payload(close_code::normal, msg.data(), msg.size());
if constexpr (is_span_v<Source>) {
close_str = ws.format_close_payload(close_code::normal, source.data(),
source.size());
source = {close_str.data(), close_str.size()};
}
}

std::string encode_header = ws.encode_frame(msg, op, need_mask);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(msg.data(), msg.size())};
if constexpr (is_span_v<Source>) {
std::string encode_header = ws.encode_frame(source, op, need_mask);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(source.data(), source.size())};

auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
}
}
else {
while (true) {
auto result = co_await source();

std::span<char> msg(result.buf.data(), result.buf.size());
std::string encode_header =
ws.encode_frame(msg, op, need_mask, result.eof);
std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(msg.data(), msg.size())};

auto [ec, _] = co_await async_write(buffers);
if (ec) {
data.net_err = ec;
data.status = 404;
break;
}

if (result.eof) {
break;
}
}
}

co_return data;
}

async_simple::coro::Lazy<resp_data> async_send_ws_close(
std::string msg = "") {
return async_send_ws(std::move(msg), false, opcode::close);
co_return co_await async_send_ws(std::move(msg), false, opcode::close);
}

void on_ws_msg(std::function<void(resp_data)> on_ws_msg) {
Expand Down Expand Up @@ -1654,7 +1709,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

data_ptr = asio::buffer_cast<const char *>(read_buf_.data());
if (is_close_frame) {
payload_len -= 4;
payload_len -= 2;
data_ptr += sizeof(uint16_t);
}

Expand Down
9 changes: 8 additions & 1 deletion include/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct websocket_result {
std::error_code ec;
ws_frame_type type;
std::string_view data;
bool eof;
};

class coro_http_connection
Expand Down Expand Up @@ -320,8 +321,8 @@ class coro_http_connection
const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
auto status = ws_.parse_header(data_ptr, ws_.len_bytes());
if (status == ws_header_status::complete) {
ws_.reset_len_bytes();
head_buf_.consume(head_buf_.size());

std::span<char> payload{};
auto payload_length = ws_.payload_length();
if (payload_length > 0) {
Expand Down Expand Up @@ -353,8 +354,14 @@ class coro_http_connection
break;
case cinatra::ws_frame_type::WS_OPENING_FRAME:
continue;
case ws_frame_type::WS_INCOMPLETE_TEXT_FRAME:
case ws_frame_type::WS_INCOMPLETE_BINARY_FRAME:
result.eof = false;
result.data = {payload.data(), payload.size()};
break;
case cinatra::ws_frame_type::WS_TEXT_FRAME:
case cinatra::ws_frame_type::WS_BINARY_FRAME: {
result.eof = true;
result.data = {payload.data(), payload.size()};
} break;
case cinatra::ws_frame_type::WS_CLOSE_FRAME: {
Expand Down
10 changes: 7 additions & 3 deletions include/cinatra/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class websocket {
}

int len_bytes() const { return len_bytes_; }
void reset_len_bytes() { len_bytes_ = SHORT_HEADER; }

ws_frame_type parse_payload(std::span<char> buf) {
// unmask data:
Expand Down Expand Up @@ -172,12 +173,12 @@ class websocket {
asio::buffer(src, length)};
}

inline std::string encode_frame(std::string &data, opcode op,
bool need_mask) {
std::string encode_frame(std::span<char> &data, opcode op, bool need_mask,
bool eof = true) {
std::string header;
/// Base header.
frame_header hdr{};
hdr.fin = 1;
hdr.fin = eof;
hdr.rsv1 = 0;
hdr.rsv2 = 0;
hdr.rsv3 = 0;
Expand Down Expand Up @@ -253,6 +254,9 @@ class websocket {

std::string format_close_payload(uint16_t code, char *message,
size_t length) {
if (length == 0) {
return "";
}
std::string close_payload;
if (code) {
close_payload.resize(length + 2);
Expand Down
64 changes: 25 additions & 39 deletions tests/test_cinatra_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ async_simple::coro::Lazy<void> test_websocket(coro_http_client &client) {
return;
}

std::cout << "client get ws msg: " << data.resp_body << "\n";

bool r = data.resp_body.find("hello websocket") != std::string::npos ||
data.resp_body.find("test again") != std::string::npos;
CHECK(r);
Expand All @@ -89,47 +91,34 @@ async_simple::coro::Lazy<void> test_websocket(coro_http_client &client) {

auto result = co_await client.async_send_ws("hello websocket");
std::cout << result.net_err << "\n";
result = co_await client.async_send_ws("test again", /*need_mask = */ false);
result = co_await client.async_send_ws("test again", /*need_mask = */
false);
std::cout << result.net_err << "\n";
result = co_await client.async_send_ws_close("ws close");
std::cout << result.net_err << "\n";
}

TEST_CASE("test websocket") {
http_server server(std::thread::hardware_concurrency());
bool r = server.listen("0.0.0.0", "8090");
if (!r) {
std::cout << "listen failed."
<< "\n";
}
server.enable_timeout(false);
server.set_http_handler<GET, POST>("/ws", [](request &req, response &res) {
assert(req.get_content_type() == content_type::websocket);

req.on(ws_open, [](request &req) {
std::cout << "websocket start" << std::endl;
});

req.on(ws_message, [](request &req) {
auto part_data = req.get_part_data();
// echo
std::string str = std::string(part_data.data(), part_data.length());
req.get_conn<cinatra::NonSSL>()->send_ws_string(str);
std::cout << part_data.data() << std::endl;
});

req.on(ws_error, [](request &req) {
std::cout << "websocket pack error or network error" << std::endl;
});
});

std::promise<void> pr;
std::future<void> f = pr.get_future();
std::thread server_thread([&server, &pr]() {
pr.set_value();
server.run();
});
f.wait();
cinatra::coro_http_server server(1, 8090);
server.set_http_handler<cinatra::GET>(
"/ws",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
CHECK(req.get_content_type() == content_type::websocket);
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
break;
}

auto ec = co_await req.get_conn()->write_websocket(result.data);
if (ec) {
break;
}
}
});
server.async_start();

std::this_thread::sleep_for(std::chrono::milliseconds(100));

Expand All @@ -138,12 +127,9 @@ TEST_CASE("test websocket") {

async_simple::coro::syncAwait(test_websocket(*client));

client->async_close();

std::this_thread::sleep_for(std::chrono::milliseconds(300));

server.stop();
server_thread.join();
// client->async_close();
}

void test_websocket_content(size_t len) {
Expand Down
Loading

0 comments on commit f096469

Please sign in to comment.