Skip to content

Commit

Permalink
async upload file no copy
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Jul 2, 2024
1 parent 3d46ee8 commit 7bf3244
Show file tree
Hide file tree
Showing 12 changed files with 751 additions and 120 deletions.
69 changes: 37 additions & 32 deletions include/ylt/coro_io/coro_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,22 @@ class coro_file {
public:
#if defined(YLT_ENABLE_FILE_IO_URING)
coro_file(
coro_io::ExecutorWrapper<>* executor = coro_io::get_global_executor())
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
: coro_file(executor->get_asio_executor()) {}

coro_file(asio::io_context::executor_type executor)
: executor_wrapper_(executor) {}
#else

coro_file(coro_io::ExecutorWrapper<>* executor =
coro_file(coro_io::ExecutorWrapper<> *executor =
coro_io::get_global_block_executor())
: coro_file(executor->get_asio_executor()) {}

coro_file(asio::io_context::executor_type executor)
: executor_wrapper_(executor) {}
#endif

bool is_open() {
bool is_open() const {
if (type_ == read_type::pread) {
return fd_file_ != nullptr;
}
Expand All @@ -150,7 +150,7 @@ class coro_file {
#endif
}

bool eof() { return eof_; }
bool eof() const { return eof_; }

void close() {
if (stream_file_) {
Expand All @@ -161,20 +161,22 @@ class coro_file {
}
}

static size_t file_size(std::string_view filepath) {
std::error_code ec;
size_t size = std::filesystem::file_size(filepath, ec);
return size;
size_t file_size(std::error_code ec) const noexcept {
return std::filesystem::file_size(file_path_, ec);
}

size_t file_size() const { return std::filesystem::file_size(file_path_); }

std::string_view file_path() const { return file_path_; }

async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_pread(
size_t offset, char* data, size_t size) {
size_t offset, char *data, size_t size) {
if (type_ != read_type::pread) {
co_return std::make_pair(
std::make_error_code(std::errc::bad_file_descriptor), 0);
}
#if defined(ASIO_WINDOWS)
auto pread = [](int fd, void* buf, uint64_t count,
auto pread = [](int fd, void *buf, uint64_t count,
uint64_t offset) -> int64_t {
DWORD bytes_read = 0;
OVERLAPPED overlapped;
Expand All @@ -195,13 +197,13 @@ class coro_file {
}

async_simple::coro::Lazy<std::error_code> async_pwrite(size_t offset,
const char* data,
const char *data,
size_t size) {
if (type_ != read_type::pread) {
co_return std::make_error_code(std::errc::bad_file_descriptor);
}
#if defined(ASIO_WINDOWS)
auto pwrite = [](int fd, const void* buf, uint64_t count,
auto pwrite = [](int fd, const void *buf, uint64_t count,
uint64_t offset) -> int64_t {
DWORD bytes_write = 0;
OVERLAPPED overlapped;
Expand All @@ -218,7 +220,7 @@ class coro_file {
return bytes_write;
};
#endif
auto result = co_await async_prw(pwrite, false, offset, (char*)data, size);
auto result = co_await async_prw(pwrite, false, offset, (char *)data, size);
co_return result.first;
}

Expand All @@ -240,7 +242,7 @@ class coro_file {
stream_file_ = std::make_shared<asio::random_access_file>(
executor_wrapper_.get_asio_executor());
}
} catch (std::exception& ex) {
} catch (std::exception &ex) {
stream_file_ = nullptr;
std::cout << "line " << __LINE__ << " coro_file create failed"
<< ex.what() << "\n";
Expand All @@ -267,7 +269,7 @@ class coro_file {
}

std::error_code seek_ec;
reinterpret_cast<asio::stream_file*>(stream_file_.get())
reinterpret_cast<asio::stream_file *>(stream_file_.get())
->seek(offset, static_cast<asio::file_base::seek_basis>(whence),
seek_ec);
if (seek_ec) {
Expand All @@ -277,15 +279,15 @@ class coro_file {
}

async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_at(
uint64_t offset, char* data, size_t size) {
uint64_t offset, char *data, size_t size) {
if (type_ != read_type::uring_random) {
co_return std::make_pair(
std::make_error_code(std::errc::bad_file_descriptor), 0);
}

auto [ec, read_size] = co_await coro_io::async_read_at(
offset,
*reinterpret_cast<asio::random_access_file*>(stream_file_.get()),
*reinterpret_cast<asio::random_access_file *>(stream_file_.get()),
asio::buffer(data, size));

if (ec == asio::error::eof) {
Expand All @@ -297,28 +299,28 @@ class coro_file {
}

async_simple::coro::Lazy<std::error_code> async_write_at(uint64_t offset,
const char* data,
const char *data,
size_t size) {
if (type_ != read_type::uring_random) {
co_return std::make_error_code(std::errc::bad_file_descriptor);
}

auto [ec, write_size] = co_await coro_io::async_write_at(
offset,
*reinterpret_cast<asio::random_access_file*>(stream_file_.get()),
*reinterpret_cast<asio::random_access_file *>(stream_file_.get()),
asio::buffer(data, size));
co_return ec;
}

async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
char* data, size_t size) {
char *data, size_t size) {
if (type_ != read_type::uring) {
co_return std::make_pair(
std::make_error_code(std::errc::bad_file_descriptor), 0);
}

auto [ec, read_size] = co_await coro_io::async_read(
*reinterpret_cast<asio::stream_file*>(stream_file_.get()),
*reinterpret_cast<asio::stream_file *>(stream_file_.get()),
asio::buffer(data, size));
if (ec == asio::error::eof) {
eof_ = true;
Expand All @@ -328,14 +330,14 @@ class coro_file {
co_return std::make_pair(std::error_code{}, read_size);
}

async_simple::coro::Lazy<std::error_code> async_write(const char* data,
async_simple::coro::Lazy<std::error_code> async_write(const char *data,
size_t size) {
if (type_ != read_type::uring) {
co_return std::make_error_code(std::errc::bad_file_descriptor);
}

auto [ec, write_size] = co_await coro_io::async_write(
*reinterpret_cast<asio::stream_file*>(stream_file_.get()),
*reinterpret_cast<asio::stream_file *>(stream_file_.get()),
asio::buffer(data, size));

co_return ec;
Expand Down Expand Up @@ -372,24 +374,26 @@ class coro_file {
async_simple::coro::Lazy<bool> async_open(std::string filepath,
int open_mode = flags::read_write,
read_type type = read_type::fread) {
file_path_ = std::move(filepath);
type_ = type;
if (type_ == read_type::pread) {
co_return open_fd(filepath, open_mode);
co_return open_fd(file_path_, open_mode);
}

if (stream_file_ != nullptr) {
co_return true;
}

auto result = co_await coro_io::post(
[this, &filepath, open_mode] {
auto fptr = fopen(filepath.data(), str_mode(open_mode).data());
[this, open_mode] {
auto fptr =
fopen(this->file_path_.data(), str_mode(open_mode).data());
if (fptr == nullptr) {
std::cout << "line " << __LINE__ << " coro_file open failed "
<< filepath << "\n";
<< this->file_path_ << "\n";
return false;
}
stream_file_ = std::shared_ptr<FILE>(fptr, [](FILE* ptr) {
stream_file_ = std::shared_ptr<FILE>(fptr, [](FILE *ptr) {
fclose(ptr);
});
return true;
Expand All @@ -399,7 +403,7 @@ class coro_file {
}

async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
char* data, size_t size) {
char *data, size_t size) {
if (type_ != read_type::fread) {
co_return std::make_pair(
std::make_error_code(std::errc::bad_file_descriptor), 0);
Expand All @@ -421,7 +425,7 @@ class coro_file {
co_return result.value();
}

async_simple::coro::Lazy<std::error_code> async_write(const char* data,
async_simple::coro::Lazy<std::error_code> async_write(const char *data,
size_t size) {
if (type_ != read_type::fread) {
co_return std::make_error_code(std::errc::bad_file_descriptor);
Expand All @@ -443,7 +447,7 @@ class coro_file {

private:
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_prw(
auto io_func, bool is_read, size_t offset, char* buf, size_t size) {
auto io_func, bool is_read, size_t offset, char *buf, size_t size) {
std::function<int()> func = [=, this] {
int fd = *fd_file_;
return io_func(fd, buf, size, offset);
Expand Down Expand Up @@ -483,7 +487,7 @@ class coro_file {
return false;
}

fd_file_ = std::shared_ptr<int>(new int(fd), [](int* ptr) {
fd_file_ = std::shared_ptr<int>(new int(fd), [](int *ptr) {
#if defined(ASIO_WINDOWS)
_close(*ptr);
#else
Expand Down Expand Up @@ -536,6 +540,7 @@ class coro_file {
#endif
coro_io::ExecutorWrapper<> executor_wrapper_;
std::shared_ptr<int> fd_file_;
std::string file_path_;
std::atomic<bool> eof_ = false;
};
} // namespace coro_io
51 changes: 51 additions & 0 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

#include "io_context_pool.hpp"
#include "ylt/util/type_traits.h"
#ifdef __linux__
#include <sys/sendfile.h>
#endif

namespace coro_io {

Expand Down Expand Up @@ -503,4 +506,52 @@ inline std::error_code connect(executor_t &executor,
return error;
}

#ifdef __linux__

inline async_simple::coro::Lazy<std::pair<std::error_code, std::size_t>>
async_sendfile(asio::ip::tcp::socket &socket, int fd, off_t offset,
size_t size) noexcept {
std::error_code ec;
std::size_t least_bytes = size;
if (!ec) [[likely]] {
if (!socket.native_non_blocking()) {
socket.native_non_blocking(true, ec);
if (ec) {
co_return std::pair{ec, 0};
}
}
while (true) {
// Try the system call.
errno = 0;
int n = ::sendfile(socket.native_handle(), fd, &offset,
std::min(std::size_t{65536}, least_bytes));
ec = asio::error_code(n < 0 ? errno : 0,
asio::error::get_system_category());
least_bytes -= ec ? 0 : n;
// total_bytes_transferred += ec ? 0 : n;
// Retry operation immediately if interrupted by signal.
if (ec == asio::error::interrupted) [[unlikely]]
continue;
// Check if we need to run the operation again.
if (ec == asio::error::would_block || ec == asio::error::try_again)
[[unlikely]] {
callback_awaitor<std::error_code> non_block_awaitor;
// We have to wait for the socket to become ready again.
ec = co_await non_block_awaitor.await_resume([&](auto handler) {
socket.async_wait(asio::ip::tcp::socket::wait_write,
[handler](const auto &ec) {
handler.set_value_then_resume(ec);
});
});
continue;
}
if (ec || n == 0 || least_bytes == 0) [[unlikely]] { // End of File
break;
}
// Loop around to try calling sendfile again.
}
}
co_return std::pair{ec, size - least_bytes};
}
#endif
} // namespace coro_io
79 changes: 79 additions & 0 deletions include/ylt/standalone/cinatra/brzip.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once
#include <brotli/decode.h>
#include <brotli/encode.h>

#include <array>
#include <sstream>
#include <string>
#include <string_view>

namespace cinatra::br_codec {

#define BROTLI_BUFFER_SIZE 1024

inline bool brotli_compress(std::string_view input, std::string &output) {
auto instance = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
std::array<uint8_t, BROTLI_BUFFER_SIZE> buffer;
std::stringstream result;

size_t available_in = input.size(), available_out = buffer.size();
const uint8_t *next_in = reinterpret_cast<const uint8_t *>(input.data());
uint8_t *next_out = buffer.data();

do {
int ret = BrotliEncoderCompressStream(instance, BROTLI_OPERATION_FINISH,
&available_in, &next_in,
&available_out, &next_out, nullptr);
if (!ret)
return false;
result.write(reinterpret_cast<const char *>(buffer.data()),
buffer.size() - available_out);
available_out = buffer.size();
next_out = buffer.data();
} while (!(available_in == 0 && BrotliEncoderIsFinished(instance)));

BrotliEncoderDestroyInstance(instance);
output = result.str();
return true;
}

inline bool brotli_decompress(std::string_view input,
std::string &decompressed) {
if (input.size() == 0)
return false;

size_t available_in = input.size();
auto next_in = (const uint8_t *)(input.data());
decompressed = std::string(available_in * 3, 0);
size_t available_out = decompressed.size();
auto next_out = (uint8_t *)(decompressed.data());
size_t total_out{0};
bool done = false;
auto s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
while (!done) {
auto result = BrotliDecoderDecompressStream(
s, &available_in, &next_in, &available_out, &next_out, &total_out);
if (result == BROTLI_DECODER_RESULT_SUCCESS) {
decompressed.resize(total_out);
BrotliDecoderDestroyInstance(s);
return true;
}
else if (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT) {
if (total_out != decompressed.size()) {
return false;
}
decompressed.resize(total_out * 2);
next_out = (uint8_t *)(decompressed.data() + total_out);
available_out = total_out;
}
else {
decompressed.resize(0);
BrotliDecoderDestroyInstance(s);
return true;
}
}
return true;
}
} // namespace cinatra::br_codec

// namespace cinatra::br_codec
Loading

0 comments on commit 7bf3244

Please sign in to comment.