diff --git a/include/cinatra/ylt/coro_io/coro_file.hpp b/include/cinatra/ylt/coro_io/coro_file.hpp index 9e551a33..e3916b8f 100644 --- a/include/cinatra/ylt/coro_io/coro_file.hpp +++ b/include/cinatra/ylt/coro_io/coro_file.hpp @@ -14,19 +14,24 @@ * limitations under the License. */ #pragma once -#include +#include +#include +#include + +#include +#include #include #include #include -#include "async_simple/Promise.h" -#include "async_simple/Traits.h" -#include "async_simple/coro/FutureAwaiter.h" #include "io_context_pool.hpp" #if defined(ENABLE_FILE_IO_URING) #include #include #endif +#include + +#include #include #include #include @@ -37,8 +42,6 @@ #include #include -#include "asio/error.hpp" -#include "async_simple/coro/Lazy.h" #include "coro_io.hpp" namespace coro_io { @@ -90,6 +93,18 @@ enum flags { #endif // defined(ASIO_WINDOWS) }; +enum class read_type { +#if defined(ENABLE_FILE_IO_URING) + uring, + uring_random, +#else + fread, +#endif +#if defined(__GNUC__) + pread, +#endif +}; + class coro_file { public: #if defined(ENABLE_FILE_IO_URING) @@ -109,27 +124,33 @@ class coro_file { : executor_wrapper_(executor) {} #endif - bool is_open() { return stream_file_ != nullptr; } + bool is_open() { return stream_file_ != nullptr || fd_file_ != nullptr; } void flush() { #if defined(ENABLE_FILE_IO_URING) #else - if (stream_file_) { - auto fptr = stream_file_.get(); -#if defined(__GNUC__) and defined(USE_PREAD_WRITE) - int fd = *stream_file_; - fsync(fd); -#else - fflush(fptr); + if (fd_file_) { +#if defined(__GNUC__) + fsync(*fd_file_); #endif } + else if (stream_file_) { + fflush(stream_file_.get()); + } #endif } bool eof() { return eof_; } - void close() { stream_file_.reset(); } + void close() { + if (stream_file_) { + stream_file_.reset(); + } + else if (fd_file_) { + fd_file_.reset(); + } + } static size_t file_size(std::string_view filepath) { std::error_code ec; @@ -137,12 +158,82 @@ class coro_file { return size; } +#if defined(__GNUC__) + bool open_fd(std::string_view filepath, int open_mode = flags::read_write) { + if (fd_file_) { + return true; + } + + int fd = open(filepath.data(), open_mode); + if (fd < 0) { + return false; + } + + fd_file_ = std::shared_ptr(new int(fd), [](int* ptr) { + ::close(*ptr); + delete ptr; + }); + return true; + } + + async_simple::coro::Lazy> async_prw( + auto io_func, bool is_read, size_t offset, char* buf, size_t size) { + std::function func = [=, this] { + int fd = *fd_file_; + return io_func(fd, buf, size, offset); + }; + + std::error_code ec{}; + size_t op_size = 0; + + auto len_val = co_await coro_io::post(std::move(func), &executor_wrapper_); + int len = len_val.value(); + if (len == 0) { + if (is_read) { + eof_ = true; + } + } + else if (len > 0) { + op_size = len; + } + else { + ec = std::make_error_code(std::errc::io_error); + } + + co_return std::make_pair(ec, op_size); + } + + async_simple::coro::Lazy> async_pread( + size_t offset, char* data, size_t size) { + co_return co_await async_prw(pread, true, offset, data, size); + } + + async_simple::coro::Lazy async_pwrite(size_t offset, + const char* data, + size_t size) { + auto result = co_await async_prw(pwrite, false, offset, (char*)data, size); + co_return result.first; + } +#endif + #if defined(ENABLE_FILE_IO_URING) async_simple::coro::Lazy async_open(std::string_view filepath, - int open_mode = flags::read_write) { + int open_mode = flags::read_write, + read_type type = read_type::uring) { + type_ = type; + if (type == read_type::pread) { + co_return open_fd(filepath, open_mode); + } + try { - stream_file_ = std::make_unique( - executor_wrapper_.get_asio_executor()); + if (type == read_type::uring) { + stream_file_ = std::make_shared( + executor_wrapper_.get_asio_executor()); + } + else { + stream_file_ = std::make_shared( + executor_wrapper_.get_asio_executor()); + } } catch (std::exception& ex) { std::cout << ex.what() << "\n"; co_return false; @@ -151,6 +242,7 @@ class coro_file { std::error_code ec; stream_file_->open(filepath.data(), static_cast(open_mode), ec); + if (ec) { std::cout << ec.message() << "\n"; co_return false; @@ -160,82 +252,74 @@ class coro_file { } bool seek(long offset, int whence) { + if (type_ != read_type::uring) { + return false; + } + + assert(stream_file_); std::error_code seek_ec; - stream_file_->seek(offset, static_cast(whence), - seek_ec); + reinterpret_cast(stream_file_.get()) + ->seek(offset, static_cast(whence), + seek_ec); if (seek_ec) { return false; } return true; } - async_simple::coro::Lazy> async_read( - char* data, size_t size) { - size_t left_size = size; - size_t offset = 0; - size_t read_total = 0; - while (left_size) { - auto [ec, read_size] = co_await coro_io::async_read_some( - *stream_file_, asio::buffer(data + offset, size - offset)); - if (ec) { - if (ec == asio::error::eof) { - eof_ = true; - co_return std::make_pair(std::error_code{}, read_total); - } - - co_return std::make_pair(ec, 0); - } + async_simple::coro::Lazy> async_read_at( + uint64_t offset, char* data, size_t size) { + assert(stream_file_); - if (read_size > size) { - // if read_size is very large, it means the size if negative, and there - // is an error occurred. - co_return std::make_pair( - std::make_error_code(std::errc::invalid_argument), 0); - } + auto [ec, read_size] = co_await coro_io::async_read_at( + offset, + *reinterpret_cast(stream_file_.get()), + asio::buffer(data, size)); - read_total += read_size; - - left_size -= read_size; - offset += read_size; - seek_offset_ += read_size; - std::error_code seek_ec; - stream_file_->seek(seek_offset_, asio::file_base::seek_basis::seek_set, - seek_ec); - if (seek_ec) { - co_return std::make_pair(std::make_error_code(std::errc::invalid_seek), - 0); - } + if (ec == asio::error::eof) { + eof_ = true; + co_return std::make_pair(std::error_code{}, read_size); } - co_return std::make_pair(std::error_code{}, read_total); + co_return std::make_pair(std::error_code{}, read_size); + } + + async_simple::coro::Lazy async_write_at(uint64_t offset, + const char* data, + size_t size) { + assert(stream_file_); + + auto [ec, write_size] = co_await coro_io::async_write_at( + offset, + *reinterpret_cast(stream_file_.get()), + asio::buffer(data, size)); + co_return ec; + } + + async_simple::coro::Lazy> async_read( + char* data, size_t size) { + assert(stream_file_); + + auto [ec, read_size] = co_await coro_io::async_read( + *reinterpret_cast(stream_file_.get()), + asio::buffer(data, size)); + if (ec == asio::error::eof) { + eof_ = true; + co_return std::make_pair(std::error_code{}, read_size); + } + + co_return std::make_pair(std::error_code{}, read_size); } async_simple::coro::Lazy async_write(const char* data, size_t size) { - size_t left_size = size; - size_t offset = 0; - while (left_size) { - auto [ec, write_size] = co_await coro_io::async_write_some( - *stream_file_, asio::buffer(data, size)); - - if (ec) { - co_return ec; - } + assert(stream_file_); - left_size -= write_size; - if (left_size == 0) { - co_return ec; - } - offset += write_size; - std::error_code seek_ec; - stream_file_->seek(offset, asio::file_base::seek_basis::seek_set, - seek_ec); - if (seek_ec) { - co_return seek_ec; - } - } + auto [ec, write_size] = co_await coro_io::async_write( + *reinterpret_cast(stream_file_.get()), + asio::buffer(data, size)); - co_return std::error_code{}; + co_return ec; } #else std::string str_mode(int open_mode) { @@ -258,71 +342,21 @@ class coro_file { } } -#if defined(__GNUC__) and defined(USE_PREAD_WRITE) - async_simple::coro::Lazy async_open(std::string filepath, - int open_mode = flags::read_write) { - if (stream_file_) { - co_return true; - } - - int fd = open(filepath.data(), open_mode); - if (fd < 0) { - co_return false; - } - - stream_file_ = std::shared_ptr(new int(fd), [](int* ptr) { - ::close(*ptr); - delete ptr; - }); - - co_return true; - } - - async_simple::coro::Lazy> async_prw( - auto io_func, bool is_read, size_t offset, char* buf, size_t size) { - std::function func = [=, this] { - int fd = *stream_file_; - return io_func(fd, buf, size, offset); - }; - - std::error_code ec{}; - size_t op_size = 0; - - auto len_val = co_await coro_io::post(std::move(func), &executor_wrapper_); - int len = len_val.value(); - if (len == 0) { - if (is_read) { - eof_ = true; - } - } - else if (len > 0) { - op_size = len; - } - else { - ec = std::make_error_code(std::errc::io_error); - } - - co_return std::make_pair(ec, op_size); - } - - async_simple::coro::Lazy> async_read( - size_t offset, char* data, size_t size) { - co_return co_await async_prw(pread, true, offset, data, size); - } - - async_simple::coro::Lazy async_write(size_t offset, - const char* data, - size_t size) { - auto result = co_await async_prw(pwrite, false, offset, (char*)data, size); - co_return result.first; - } -#else bool seek(long offset, int whence) { + assert(fd_file_ == nullptr); + return fseek(stream_file_.get(), offset, whence) == 0; } async_simple::coro::Lazy async_open(std::string filepath, - int open_mode = flags::read_write) { + int open_mode = flags::read_write, + read_type type = read_type::fread) { +#if defined(__GNUC__) + if (type == read_type::pread) { + co_return open_fd(filepath, open_mode); + } +#endif + if (stream_file_ != nullptr) { co_return true; } @@ -378,24 +412,17 @@ class coro_file { co_return result.value(); } -#endif - #endif private: #if defined(ENABLE_FILE_IO_URING) - std::unique_ptr stream_file_; - std::atomic seek_offset_ = 0; -#else - -#if defined(__GNUC__) and defined(USE_PREAD_WRITE) - std::shared_ptr stream_file_; + std::shared_ptr> stream_file_; + read_type type_ = read_type::uring; #else std::shared_ptr stream_file_; -#endif #endif coro_io::ExecutorWrapper<> executor_wrapper_; - + std::shared_ptr fd_file_; std::atomic eof_ = false; }; } // namespace coro_io diff --git a/include/cinatra/ylt/coro_io/coro_io.hpp b/include/cinatra/ylt/coro_io/coro_io.hpp index affbedc5..4e4829b6 100644 --- a/include/cinatra/ylt/coro_io/coro_io.hpp +++ b/include/cinatra/ylt/coro_io/coro_io.hpp @@ -1,23 +1,23 @@ #pragma once +#include #include +#include #include -#include -#include - -#include "asio/dispatch.hpp" -#include "async_simple/Executor.h" -#include "async_simple/coro/Sleep.h" - -#if defined(ENABLE_SSL) || defined(CINATRA_ENABLE_SSL) +#if defined(YLT_ENABLE_SSL) || defined(CINATRA_ENABLE_SSL) #include #endif #include +#include #include #include +#include #include #include +#include +#include +#include #include "../util/type_traits.h" #include "io_context_pool.hpp" @@ -122,6 +122,18 @@ async_read_some(Socket &socket, AsioBuffer &&buffer) noexcept { }); } +template +inline async_simple::coro::Lazy> +async_read_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { + callback_awaitor> awaitor; + co_return co_await awaitor.await_resume([&](auto handler) { + asio::async_read_at(socket, offset, buffer, + [&, handler](const auto &ec, auto size) { + handler.set_value_then_resume(ec, size); + }); + }); +} + template inline async_simple::coro::Lazy> async_read( Socket &socket, AsioBuffer &&buffer) noexcept { @@ -180,6 +192,18 @@ async_write_some(Socket &socket, AsioBuffer &&buffer) noexcept { }); } +template +inline async_simple::coro::Lazy> +async_write_at(uint64_t offset, Socket &socket, AsioBuffer &&buffer) noexcept { + callback_awaitor> awaitor; + co_return co_await awaitor.await_resume([&](auto handler) { + asio::async_write_at(socket, offset, buffer, + [&, handler](const auto &ec, auto size) { + handler.set_value_then_resume(ec, size); + }); + }); +} + template inline async_simple::coro::Lazy async_connect( executor_t *executor, asio::ip::tcp::socket &socket, diff --git a/include/cinatra/ylt/coro_io/io_context_pool.hpp b/include/cinatra/ylt/coro_io/io_context_pool.hpp index 0b4faa9d..b3667e29 100644 --- a/include/cinatra/ylt/coro_io/io_context_pool.hpp +++ b/include/cinatra/ylt/coro_io/io_context_pool.hpp @@ -246,7 +246,7 @@ template inline T &g_io_context_pool( unsigned pool_size = std::thread::hardware_concurrency()) { static auto _g_io_context_pool = std::make_shared(pool_size); - static bool run_helper = [](auto pool) { + [[maybe_unused]] static bool run_helper = [](auto pool) { std::thread thrd{[pool] { pool->run(); }}; @@ -260,7 +260,7 @@ template inline T &g_block_io_context_pool( unsigned pool_size = std::thread::hardware_concurrency()) { static auto _g_io_context_pool = std::make_shared(pool_size); - static bool run_helper = [](auto pool) { + [[maybe_unused]] static bool run_helper = [](auto pool) { std::thread thrd{[pool] { pool->run(); }}; diff --git a/tests/test_corofile.cpp b/tests/test_corofile.cpp index 21e10b8d..67215367 100644 --- a/tests/test_corofile.cpp +++ b/tests/test_corofile.cpp @@ -69,31 +69,57 @@ void create_files(const std::vector& files, size_t file_size) { } } -#if defined(__GNUC__) and defined(USE_PREAD_WRITE) +#if defined(__GNUC__) TEST_CASE("coro_file pread and pwrite basic test") { std::string filename = "test.tmp"; create_files({filename}, 190); { coro_io::coro_file file{}; - async_simple::coro::syncAwait( - file.async_open(filename.data(), coro_io::flags::read_only)); + async_simple::coro::syncAwait(file.async_open( + filename.data(), coro_io::flags::read_only, coro_io::read_type::pread)); CHECK(file.is_open()); char buf[100]; - auto pair = async_simple::coro::syncAwait(file.async_read(0, buf, 10)); + auto pair = async_simple::coro::syncAwait(file.async_pread(0, buf, 10)); CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA"); CHECK(!file.eof()); - pair = async_simple::coro::syncAwait(file.async_read(10, buf, 100)); + pair = async_simple::coro::syncAwait(file.async_pread(10, buf, 100)); CHECK(!file.eof()); CHECK(pair.second == 100); - pair = async_simple::coro::syncAwait(file.async_read(110, buf, 100)); + pair = async_simple::coro::syncAwait(file.async_pread(110, buf, 100)); + CHECK(!file.eof()); + CHECK(pair.second == 80); + + // only read size equal 0 is eof. + pair = async_simple::coro::syncAwait(file.async_pread(200, buf, 100)); + CHECK(file.eof()); + CHECK(pair.second == 0); + } + +#if defined(YLT_ENABLE_FILE_IO_URING) + { + coro_io::coro_file file{}; + async_simple::coro::syncAwait( + file.async_open(filename.data(), coro_io::flags::read_only, + coro_io::read_type::uring_random)); + CHECK(file.is_open()); + + char buf[100]; + auto pair = async_simple::coro::syncAwait(file.async_read_at(0, buf, 10)); + CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA"); CHECK(!file.eof()); + + pair = async_simple::coro::syncAwait(file.async_read_at(10, buf, 100)); + CHECK(!file.eof()); + CHECK(pair.second == 100); + + pair = async_simple::coro::syncAwait(file.async_read_at(110, buf, 100)); CHECK(pair.second == 80); // only read size equal 0 is eof. - pair = async_simple::coro::syncAwait(file.async_read(200, buf, 100)); + pair = async_simple::coro::syncAwait(file.async_read_at(200, buf, 100)); CHECK(file.eof()); CHECK(pair.second == 0); } @@ -101,30 +127,60 @@ TEST_CASE("coro_file pread and pwrite basic test") { { coro_io::coro_file file{}; async_simple::coro::syncAwait( - file.async_open(filename.data(), coro_io::flags::read_write)); + file.async_open(filename.data(), coro_io::flags::read_write, + coro_io::read_type::uring_random)); + CHECK(file.is_open()); + + std::string buf = "cccccccccc"; + auto ec = async_simple::coro::syncAwait( + file.async_write_at(0, buf.data(), buf.size())); + CHECK(!ec); + + std::string buf1 = "dddddddddd"; + ec = async_simple::coro::syncAwait( + file.async_write_at(10, buf1.data(), buf1.size())); + CHECK(!ec); + + char buf2[100]; + auto pair = async_simple::coro::syncAwait(file.async_read_at(0, buf2, 10)); + CHECK(!file.eof()); + CHECK(std::string_view(buf2, pair.second) == "cccccccccc"); + + pair = async_simple::coro::syncAwait(file.async_read_at(10, buf2, 10)); + CHECK(!file.eof()); + CHECK(std::string_view(buf2, pair.second) == "dddddddddd"); + } +#endif + + { + coro_io::coro_file file{}; + async_simple::coro::syncAwait(file.async_open(filename.data(), + coro_io::flags::read_write, + coro_io::read_type::pread)); CHECK(file.is_open()); std::string buf = "cccccccccc"; auto ec = async_simple::coro::syncAwait( - file.async_write(0, buf.data(), buf.size())); + file.async_pwrite(0, buf.data(), buf.size())); CHECK(!ec); std::string buf1 = "dddddddddd"; ec = async_simple::coro::syncAwait( - file.async_write(10, buf1.data(), buf1.size())); + file.async_pwrite(10, buf1.data(), buf1.size())); CHECK(!ec); char buf2[100]; - auto pair = async_simple::coro::syncAwait(file.async_read(0, buf2, 10)); + auto pair = async_simple::coro::syncAwait(file.async_pread(0, buf2, 10)); CHECK(!file.eof()); CHECK(std::string_view(buf2, pair.second) == "cccccccccc"); - pair = async_simple::coro::syncAwait(file.async_read(10, buf2, 10)); + pair = async_simple::coro::syncAwait(file.async_pread(10, buf2, 10)); CHECK(!file.eof()); CHECK(std::string_view(buf2, pair.second) == "dddddddddd"); } } -#else +#endif + async_simple::coro::Lazy test_basic_read(std::string filename) { coro_io::coro_file file{}; co_await file.async_open(filename.data(), coro_io::flags::read_only); @@ -815,7 +871,6 @@ TEST_CASE("large_file_write_with_pool_test") { file.close(); fs::remove(fs::path(filename)); } -#endif DOCTEST_MSVC_SUPPRESS_WARNING_WITH_PUSH(4007) int main(int argc, char** argv) { return doctest::Context(argc, argv).run(); }