Skip to content

Commit

Permalink
Coro http download server (qicosmos#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Dec 11, 2023
1 parent f15d84b commit 5f32441
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 12 deletions.
11 changes: 6 additions & 5 deletions include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
};

coro_http_client(asio::io_context::executor_type executor)
: socket_(std::make_shared<socket_t>(executor)),
: executor_wrapper_(executor),
timer_(&executor_wrapper_),
socket_(std::make_shared<socket_t>(executor)),
read_buf_(socket_->read_buf_),
chunked_buf_(socket_->chunked_buf_),
executor_wrapper_(executor),
timer_(&executor_wrapper_) {}
chunked_buf_(socket_->chunked_buf_) {}

coro_http_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
Expand Down Expand Up @@ -644,7 +644,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
std::error_code err_code;
timer_.cancel(err_code);
auto ret = co_await std::move(future);
co_await std::move(future);
if (is_timeout_) {
co_return std::make_error_code(std::errc::timed_out);
}
Expand Down Expand Up @@ -752,6 +752,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

req_context<> ctx{};
if (range.empty()) {
add_header("Transfer-Encoding", "chunked");
ctx = {req_content_type::none, "", "", std::move(file)};
}
else {
Expand Down
41 changes: 41 additions & 0 deletions include/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ class coro_http_connection
#endif

async_simple::coro::Lazy<void> start() {
#ifdef CINATRA_ENABLE_SSL
bool has_shake = false;
#endif
while (true) {
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_ && !has_shake) {
Expand Down Expand Up @@ -220,6 +222,45 @@ class coro_http_connection
co_return true;
}

async_simple::coro::Lazy<bool> write_data(std::string_view message) {
std::vector<asio::const_buffer> buffers;
buffers.push_back(asio::buffer(message));
auto [ec, _] = co_await async_write(buffers);
if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
close();
co_return false;
}

if (!keep_alive_) {
// now in io thread, so can close socket immediately.
close();
}

co_return true;
}

async_simple::coro::Lazy<bool> write_chunked_data(std::string_view buf,
bool eof) {
std::string chunk_size_str = "";
std::vector<asio::const_buffer> buffers =
to_chunked_buffers<asio::const_buffer>(buf.data(), buf.length(),
chunk_size_str, eof);
auto [ec, _] = co_await async_write(std::move(buffers));
if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
close();
co_return false;
}

if (!keep_alive_) {
// now in io thread, so can close socket immediately.
close();
}

co_return true;
}

bool sync_reply() { return async_simple::coro::syncAwait(reply()); }

async_simple::coro::Lazy<bool> begin_chunked() {
Expand Down
2 changes: 2 additions & 0 deletions include/cinatra/coro_http_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class coro_http_request {
return is_chunk;
}

bool is_ranges() { return parser_.is_ranges(); }

content_type get_content_type() {
static content_type thread_local content_type = get_content_type_impl();
return content_type;
Expand Down
157 changes: 156 additions & 1 deletion include/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <asio/dispatch.hpp>
#include <cstdint>
#include <fstream>
#include <mutex>
#include <type_traits>

Expand All @@ -10,6 +11,8 @@
#include "async_simple/coro/Lazy.h"
#include "cinatra/coro_http_response.hpp"
#include "cinatra/coro_http_router.hpp"
#include "cinatra/mime_types.hpp"
#include "cinatra/utils.hpp"
#include "cinatra_log_wrapper.hpp"
#include "coro_http_connection.hpp"
#include "ylt/coro_io/coro_io.hpp"
Expand Down Expand Up @@ -78,7 +81,7 @@ class coro_http_server {
promise.setValue(ec);
}

return std::move(future);
return future;
}

// only call once, not thread safe.
Expand Down Expand Up @@ -150,6 +153,139 @@ class coro_http_server {
}
}

void set_transfer_chunked_size(size_t size) { chunked_size_ = size; }

void set_static_res_handler(std::string_view uri_suffix = "",
std::string file_path = "www") {
bool has_double_dot = (file_path.find("..") != std::string::npos) ||
(uri_suffix.find("..") != std::string::npos);
if (std::filesystem::path(file_path).has_root_path() ||
std::filesystem::path(uri_suffix).has_root_path() || has_double_dot) {
CINATRA_LOG_ERROR << "invalid file path: " << file_path;
std::exit(1);
}

if (!uri_suffix.empty()) {
static_dir_router_path_ =
std::filesystem::path(uri_suffix).make_preferred().string();
}

if (!file_path.empty()) {
static_dir_ = std::filesystem::path(file_path).make_preferred().string();
}
else {
static_dir_ = fs::absolute(fs::current_path().string()).string();
}

files_.clear();
for (const auto &file :
std::filesystem::recursive_directory_iterator(static_dir_)) {
if (!file.is_directory()) {
files_.push_back(file.path().string());
}
}

std::filesystem::path router_path =
std::filesystem::path(static_dir_router_path_);

std::string uri;
for (auto &file : files_) {
auto relative_path =
std::filesystem::path(file.substr(static_dir_.length())).string();
if (size_t pos = relative_path.find('\\') != std::string::npos) {
replace_all(relative_path, "\\", "/");
}
uri = std::string("/")
.append(static_dir_router_path_)
.append(relative_path);

set_http_handler<cinatra::GET>(
uri,
[this, file_name = file](
coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
bool is_chunked = req.is_chunked();
bool is_ranges = req.is_ranges();
if (!is_chunked && !is_ranges) {
resp.set_status(status_type::not_implemented);
co_return;
}

std::string_view extension = get_extension(file_name);
std::string_view mime = get_mime_type(extension);

std::string content;
detail::resize(content, chunked_size_);

coro_io::coro_file in_file{};
co_await in_file.async_open(file_name, coro_io::flags::read_only);
if (!in_file.is_open()) {
resp.set_status_and_content(status_type::not_found,
file_name + "not found");
co_return;
}

if (is_chunked) {
resp.set_format_type(format_type::chunked);
bool ok;
if (ok = co_await resp.get_conn()->begin_chunked(); !ok) {
co_return;
}

while (true) {
auto [ec, size] =
co_await in_file.async_read(content.data(), content.size());
if (ec) {
resp.set_status(status_type::no_content);
co_await resp.get_conn()->reply();
co_return;
}

bool r = co_await resp.get_conn()->write_chunked(
std::string_view(content.data(), size));
if (!r) {
co_return;
}

if (in_file.eof()) {
co_await resp.get_conn()->end_chunked();
break;
}
}
}
else if (is_ranges) {
auto range_header = build_range_header(
mime, file_name, coro_io::coro_file::file_size(file_name));
resp.set_delay(true);
bool r = co_await req.get_conn()->write_data(range_header);
if (!r) {
co_return;
}

while (true) {
auto [ec, size] =
co_await in_file.async_read(content.data(), content.size());
if (ec) {
resp.set_status(status_type::no_content);
co_await resp.get_conn()->reply();
co_return;
}

r = co_await req.get_conn()->write_data(
std::string_view(content.data(), size));
if (!r) {
co_return;
}

if (in_file.eof()) {
break;
}
}
}
});
}
}

void set_check_duration(auto duration) { check_duration_ = duration; }

void set_timeout_duration(
Expand Down Expand Up @@ -305,6 +441,20 @@ class coro_http_server {
}
}

std::string build_range_header(std::string_view mime,
std::string_view filename, size_t file_size) {
std::string header_str =
"HTTP/1.1 200 OK\r\nAccess-Control-Allow-origin: "
"*\r\nAccept-Ranges: bytes\r\n";
header_str.append("Content-Disposition: attachment;filename=");
header_str.append(filename).append("\r\n");
header_str.append("Connection: keep-alive\r\n");
header_str.append("Content-Type: ").append(mime).append("\r\n");
header_str.append("Content-Length: ");
header_str.append(std::to_string(file_size)).append("\r\n\r\n");
return header_str;
}

private:
std::unique_ptr<coro_io::io_context_pool> pool_;
asio::io_context *out_ctx_ = nullptr;
Expand All @@ -325,6 +475,11 @@ class coro_http_server {
asio::steady_timer check_timer_;
bool need_check_ = false;
std::atomic<bool> stop_timer_ = false;

std::string static_dir_router_path_ = "";
std::string static_dir_ = "";
std::vector<std::string> files_;
size_t chunked_size_ = 1024 * 10;
#ifdef CINATRA_ENABLE_SSL
std::string cert_file_;
std::string key_file_;
Expand Down
2 changes: 1 addition & 1 deletion include/cinatra/http_parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class http_parser {
}

bool is_ranges() const {
auto transfer_encoding = this->get_header_value("Accept-Ranges"sv);
auto transfer_encoding = this->get_header_value("Range"sv);
return !transfer_encoding.empty();
}

Expand Down
4 changes: 2 additions & 2 deletions include/cinatra/uri.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ struct context {
port(u.get_port()),
path(u.get_path()),
query(u.get_query()),
method(mthd),
body(std::move(b)) {}
body(std::move(b)),
method(mthd) {}
};
} // namespace cinatra
1 change: 0 additions & 1 deletion include/cinatra/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ class websocket {
std::string_view sec_ws_key_;

size_t payload_length_ = 0;
size_t left_payload_length_ = 0;

size_t left_header_len_ = 0;
uint8_t mask_[4] = {};
Expand Down
44 changes: 42 additions & 2 deletions tests/test_coro_http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,6 @@ TEST_CASE("test websocket with different message sizes") {
return;
}

size_t size = data.resp_body.size();
std::cout << "ws msg len: " << data.resp_body.size() << std::endl;
REQUIRE(data.resp_body == medium_message);
});
Expand Down Expand Up @@ -861,7 +860,6 @@ TEST_CASE("test websocket with message max_size limit") {
return;
}

size_t size = data.resp_body.size();
std::cout << "ws msg len: " << data.resp_body.size() << std::endl;
REQUIRE(data.resp_body == medium_message);
});
Expand Down Expand Up @@ -922,6 +920,48 @@ TEST_CASE("test ssl server") {
}
#endif

TEST_CASE("test http download server") {
cinatra::coro_http_server server(1, 9001);
std::string filename = "test_download.txt";
create_file(filename, 1010);

// curl http://127.0.0.1:9001/download/test_download.txt will download
// test_download.txt file
server.set_transfer_chunked_size(100);
server.set_static_res_handler("download", "");
server.async_start();
std::this_thread::sleep_for(200ms);

{
coro_http_client client{};
auto result = async_simple::coro::syncAwait(client.async_download(
"http://127.0.0.1:9001/download/test_download.txt", "download.txt"));

CHECK(result.status == 200);
std::string download_file = fs::absolute("download.txt").string();
std::ifstream ifs(download_file, std::ios::binary);
std::string content((std::istreambuf_iterator<char>(ifs)),
(std::istreambuf_iterator<char>()));
CHECK(content.size() == 1010);
CHECK(content[0] == 'A');
}

{
coro_http_client client{};
auto result = async_simple::coro::syncAwait(client.async_download(
"http://127.0.0.1:9001/download/test_download.txt", "download.txt",
"0-"));

CHECK(result.status == 200);
std::string download_file = fs::absolute("download.txt").string();
std::ifstream ifs(download_file, std::ios::binary);
std::string content((std::istreambuf_iterator<char>(ifs)),
(std::istreambuf_iterator<char>()));
CHECK(content.size() == 1010);
CHECK(content[0] == 'A');
}
}

DOCTEST_MSVC_SUPPRESS_WARNING_WITH_PUSH(4007)
int main(int argc, char **argv) { return doctest::Context(argc, argv).run(); }
DOCTEST_MSVC_SUPPRESS_WARNING_POP

0 comments on commit 5f32441

Please sign in to comment.