diff --git a/include/ylt/coro_io/coro_file.hpp b/include/ylt/coro_io/coro_file.hpp index 1254ad620..c6ecf9ee2 100644 --- a/include/ylt/coro_io/coro_file.hpp +++ b/include/ylt/coro_io/coro_file.hpp @@ -14,10 +14,15 @@ * limitations under the License. */ #pragma once - #include #include #include + +#include +#include +#include + +#include "io_context_pool.hpp" #if defined(YLT_ENABLE_FILE_IO_URING) #include #include @@ -25,11 +30,8 @@ #include #include -#include #include #include -#include -#include #include #include #include @@ -39,69 +41,90 @@ #include #include "coro_io.hpp" -#include "io_context_pool.hpp" namespace coro_io { -#if defined(YLT_ENABLE_FILE_IO_URING) -inline asio::file_base::flags default_flags() { - return asio::stream_file::read_write | asio::stream_file::append | - asio::stream_file::create; -} -#endif -enum class open_mode { read, write }; +/* + ┌─────────────┬───────────────────────────────┐ + │fopen() mode │ open() flags │ + ├─────────────┼───────────────────────────────┤ + │ r │ O_RDONLY │ + ├─────────────┼───────────────────────────────┤ + │ w │ O_WRONLY | O_CREAT | O_TRUNC │ + ├─────────────┼───────────────────────────────┤ + │ a │ O_WRONLY | O_CREAT | O_APPEND │ + ├─────────────┼───────────────────────────────┤ + │ r+ │ O_RDWR │ + ├─────────────┼───────────────────────────────┤ + │ w+ │ O_RDWR | O_CREAT | O_TRUNC │ + ├─────────────┼───────────────────────────────┤ + │ a+ │ O_RDWR | O_CREAT | O_APPEND │ + └─────────────┴───────────────────────────────┘ +*/ +enum flags { +#if defined(ASIO_WINDOWS) + read_only = 1, + write_only = 2, + read_write = 4, + append = 8, + create = 16, + exclusive = 32, + truncate = 64, + create_write = create | write_only, + create_write_trunc = create | write_only | truncate, + create_read_write_trunc = read_write | create | truncate, + create_read_write_append = read_write | create | append, + sync_all_on_write = 128 +#else // defined(ASIO_WINDOWS) + read_only = O_RDONLY, + write_only = O_WRONLY, + read_write = O_RDWR, + append = O_APPEND, + create = O_CREAT, + exclusive = O_EXCL, + truncate = O_TRUNC, + create_write = O_CREAT | O_WRONLY, + create_write_trunc = O_WRONLY | O_CREAT | O_TRUNC, + create_read_write_trunc = O_RDWR | O_CREAT | O_TRUNC, + create_read_write_append = O_RDWR | O_CREAT | O_APPEND, + sync_all_on_write = O_SYNC +#endif // defined(ASIO_WINDOWS) +}; class coro_file { public: #if defined(YLT_ENABLE_FILE_IO_URING) coro_file( - std::string_view filepath, open_mode flags = open_mode::read, coro_io::ExecutorWrapper<>* executor = coro_io::get_global_executor()) - : coro_file(filepath, flags, executor->get_asio_executor()) {} - - coro_file(std::string_view filepath, open_mode flags, - asio::io_context::executor_type executor) { - try { - stream_file_ = std::make_unique(executor); - } catch (std::exception& ex) { - std::cout << ex.what() << "\n"; - return; - } + : coro_file(executor->get_asio_executor()) {} - std::error_code ec; - stream_file_->open(filepath.data(), default_flags(), ec); - if (ec) { - std::cout << ec.message() << "\n"; - } - } + coro_file(asio::io_context::executor_type executor) + : executor_wrapper_(executor) {} #else - coro_file(std::string_view filepath, open_mode flags = open_mode::read, - coro_io::ExecutorWrapper<>* executor = + coro_file(coro_io::ExecutorWrapper<>* executor = coro_io::get_global_block_executor()) - : coro_file(filepath, flags, executor->get_asio_executor()) {} - - coro_file(std::string_view filepath, open_mode flags, - asio::io_context::executor_type executor) - : executor_wrapper_(executor) { - std::ios::openmode open_flags = flags == open_mode::read - ? std::ios::binary | std::ios::in - : std::ios::out | std::ios::app; - stream_file_ = std::make_unique( - std::filesystem::path(filepath), open_flags); - if (!stream_file_->is_open()) { - std::cout << "open file " << filepath << " failed " - << "\n"; - stream_file_.reset(); - } - } + : coro_file(executor->get_asio_executor()) {} + + coro_file(asio::io_context::executor_type executor) + : executor_wrapper_(executor) {} #endif - bool is_open() { + bool is_open() { return stream_file_ != nullptr; } + + void flush() { #if defined(YLT_ENABLE_FILE_IO_URING) - return stream_file_ && stream_file_->is_open(); + +#else + if (stream_file_) { + auto fptr = stream_file_.get(); +#if defined(__GNUC__) and defined(USE_PREAD_WRITE) + int fd = *stream_file_; + fsync(fd); #else - return stream_file_ && stream_file_->is_open(); + fflush(fptr); +#endif + } #endif } @@ -116,6 +139,37 @@ class coro_file { } #if defined(YLT_ENABLE_FILE_IO_URING) + async_simple::coro::Lazy async_open(std::string_view filepath, + int open_mode = flags::read_write) { + try { + stream_file_ = std::make_unique( + executor_wrapper_.get_asio_executor()); + } catch (std::exception& ex) { + std::cout << ex.what() << "\n"; + co_return false; + } + + std::error_code ec; + stream_file_->open(filepath.data(), + static_cast(open_mode), ec); + if (ec) { + std::cout << ec.message() << "\n"; + co_return false; + } + + co_return true; + } + + bool seek(long offset, int whence) { + std::error_code seek_ec; + stream_file_->seek(offset, static_cast(whence), + seek_ec); + if (seek_ec) { + return false; + } + return true; + } + async_simple::coro::Lazy> async_read( char* data, size_t size) { size_t left_size = size; @@ -185,59 +239,148 @@ class coro_file { co_return std::error_code{}; } #else + std::string str_mode(int open_mode) { + switch (open_mode) { + case flags::read_only: + return "r"; + case flags::create_write: + case flags::write_only: + return "w"; + case flags::read_write: + return "r+"; + case flags::append: + return "a"; + case flags::create_read_write_append: + return "a+"; + case flags::truncate: + return "w+"; + default: + return "r+"; + } + } + +#if defined(__GNUC__) and defined(USE_PREAD_WRITE) + async_simple::coro::Lazy async_open(std::string filepath, + int open_mode = flags::read_write) { + if (stream_file_) { + co_return true; + } + + int fd = open(filepath.data(), open_mode); + if (fd < 0) { + co_return false; + } + + stream_file_ = std::shared_ptr(new int(fd), [](int* ptr) { + ::close(*ptr); + delete ptr; + }); + + co_return true; + } + + async_simple::coro::Lazy> async_prw( + auto io_func, bool is_read, size_t offset, char* buf, size_t size) { + std::function func = [=, this] { + int fd = *stream_file_; + return io_func(fd, buf, size, offset); + }; + + std::error_code ec{}; + size_t op_size = 0; + + auto len_val = co_await coro_io::post(std::move(func), &executor_wrapper_); + int len = len_val.value(); + if (len == 0) { + if (is_read) { + eof_ = true; + } + } + else if (len > 0) { + op_size = len; + } + else { + ec = std::make_error_code(std::errc::io_error); + } + + co_return std::make_pair(ec, op_size); + } + async_simple::coro::Lazy> async_read( - char* data, size_t size) { - async_simple::Promise> promise; - async_read_impl(data, size) - .via(&executor_wrapper_) - .start([&promise](auto&& t) { - if (t.available()) { - promise.setValue(t.value()); + size_t offset, char* data, size_t size) { + co_return co_await async_prw(pread, true, offset, data, size); + } + + async_simple::coro::Lazy async_write(size_t offset, + const char* data, + size_t size) { + auto result = co_await async_prw(pwrite, false, offset, (char*)data, size); + co_return result.first; + } +#else + bool seek(long offset, int whence) { + return fseek(stream_file_.get(), offset, whence) == 0; + } + + async_simple::coro::Lazy async_open(std::string filepath, + int open_mode = flags::read_write) { + if (stream_file_ != nullptr) { + co_return true; + } + + auto result = co_await coro_io::post( + [this, &filepath, open_mode] { + auto fptr = fopen(filepath.data(), str_mode(open_mode).data()); + if (fptr == nullptr) { + std::cout << "open file " << filepath << " failed " + << "\n"; + return false; } - else { - promise.setValue(std::make_pair( - std::make_error_code(std::errc::io_error), size_t(0))); + stream_file_ = std::shared_ptr(fptr, [](FILE* ptr) { + fclose(ptr); + }); + return true; + }, + &executor_wrapper_); + co_return result.value(); + } + + async_simple::coro::Lazy> async_read( + char* data, size_t size) { + auto result = co_await coro_io::post( + [this, data, size] { + auto fptr = stream_file_.get(); + size_t read_size = fread(data, sizeof(char), size, fptr); + if (ferror(fptr)) { + return std::pair( + std::make_error_code(std::errc::io_error), 0); } - }); + eof_ = feof(fptr); + return std::pair(std::error_code{}, + read_size); + }, + &executor_wrapper_); - co_return co_await promise.getFuture(); + co_return result.value(); } async_simple::coro::Lazy async_write(const char* data, size_t size) { - async_simple::Promise promise; - async_write_impl(data, size) - .via(&executor_wrapper_) - .start([&promise](auto&& t) { - if (t.available()) { - promise.setValue(t.value()); + auto result = co_await coro_io::post( + [this, data, size] { + auto fptr = stream_file_.get(); + fwrite(data, sizeof(char), size, fptr); + if (ferror(fptr)) { + return std::make_error_code(std::errc::io_error); } - else { - promise.setValue(std::make_error_code(std::errc::io_error)); - } - }); + return std::error_code{}; + }, + &executor_wrapper_); - co_return co_await promise.getFuture(); - } - - private: - async_simple::coro::Lazy> async_read_impl( - char* data, size_t size) { - stream_file_->read(data, size); - size_t read_size = stream_file_->gcount(); - if (!stream_file_ && read_size == 0) { - co_return std::make_pair(std::make_error_code(std::errc::io_error), 0); - } - eof_ = stream_file_->eof(); - co_return std::make_pair(std::error_code{}, read_size); + co_return result.value(); } +#endif - async_simple::coro::Lazy async_write_impl(const char* data, - size_t size) { - stream_file_->write(data, size); - stream_file_->flush(); - co_return std::error_code{}; - } #endif private: @@ -245,10 +388,15 @@ class coro_file { std::unique_ptr stream_file_; std::atomic seek_offset_ = 0; #else - std::unique_ptr stream_file_; - coro_io::ExecutorWrapper<> executor_wrapper_; + +#if defined(__GNUC__) and defined(USE_PREAD_WRITE) + std::shared_ptr stream_file_; +#else + std::shared_ptr stream_file_; #endif +#endif + coro_io::ExecutorWrapper<> executor_wrapper_; std::atomic eof_ = false; }; -} // namespace coro_io \ No newline at end of file +} // namespace coro_io diff --git a/include/ylt/thirdparty/cinatra/coro_http_client.hpp b/include/ylt/thirdparty/cinatra/coro_http_client.hpp index 9ccff8080..047bbadfb 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_client.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_client.hpp @@ -104,6 +104,12 @@ struct multipart_t { size_t size = 0; }; +struct read_result { + std::string_view buf; + bool eof; + std::error_code err; +}; + class coro_http_client : public std::enable_shared_from_this { public: struct config { @@ -681,8 +687,8 @@ class coro_http_client : public std::enable_shared_from_this { std::string filename, std::string range = "") { resp_data data{}; - auto file = std::make_shared(filename, - coro_io::open_mode::write); + auto file = std::make_shared(); + co_await file->async_open(filename, coro_io::flags::create_write); if (!file->is_open()) { data.net_err = std::make_error_code(std::errc::no_such_file_or_directory); data.status = 404; @@ -751,9 +757,9 @@ class coro_http_client : public std::enable_shared_from_this { std::string_view get_port() { return port_; } - template + template async_simple::coro::Lazy async_upload_chunked( - S uri, http_method method, File file, + S uri, http_method method, Source source, req_content_type content_type = req_content_type::text, std::unordered_map headers = {}) { std::shared_ptr guard(nullptr, [this](auto) { @@ -762,6 +768,10 @@ class coro_http_client : public std::enable_shared_from_this { } }); + if (!resp_chunk_str_.empty()) { + resp_chunk_str_.clear(); + } + req_context<> ctx{content_type}; resp_data data{}; auto [ok, u] = handle_uri(data, uri); @@ -769,15 +779,16 @@ class coro_http_client : public std::enable_shared_from_this { co_return resp_data{{}, 404}; } - constexpr bool is_stream_file = is_stream_ptr_v; + constexpr bool is_stream_file = is_stream_ptr_v; if constexpr (is_stream_file) { - if (!file) { + if (!source) { co_return resp_data{ std::make_error_code(std::errc::no_such_file_or_directory), 404}; } } - else { - if (!std::filesystem::exists(file)) { + else if constexpr (std::is_same_v || + std::is_same_v) { + if (!std::filesystem::exists(source)) { co_return resp_data{ std::make_error_code(std::errc::no_such_file_or_directory), 404}; } @@ -817,18 +828,19 @@ class coro_http_client : public std::enable_shared_from_this { std::string chunk_size_str; if constexpr (is_stream_file) { - while (!file->eof()) { + while (!source->eof()) { size_t rd_size = - file->read(file_data.data(), file_data.size()).gcount(); + source->read(file_data.data(), file_data.size()).gcount(); auto bufs = cinatra::to_chunked_buffers( - file_data.data(), rd_size, chunk_size_str, file->eof()); + file_data.data(), rd_size, chunk_size_str, source->eof()); if (std::tie(ec, size) = co_await async_write(bufs); ec) { co_return resp_data{ec, 404}; } } } - else { - coro_io::coro_file coro_file(file, coro_io::open_mode::read); + else if constexpr (std::is_same_v || + std::is_same_v) { + coro_io::coro_file coro_file(source, coro_io::flags::read_only); while (!coro_file.eof()) { auto [rd_ec, rd_size] = co_await coro_file.async_read(file_data.data(), file_data.size()); @@ -839,6 +851,20 @@ class coro_http_client : public std::enable_shared_from_this { } } } + else { + std::string chunk_size_str; + while (true) { + auto result = co_await source(); + auto bufs = cinatra::to_chunked_buffers( + result.buf.data(), result.buf.size(), chunk_size_str, result.eof); + if (std::tie(ec, size) = co_await async_write(bufs); ec) { + co_return resp_data{ec, 404}; + } + if (result.eof) { + break; + } + } + } bool is_keep_alive = true; data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), @@ -1546,10 +1572,11 @@ class coro_http_client : public std::enable_shared_from_this { } if (is_file) { - coro_io::coro_file file(part.filename, coro_io::open_mode::read); + coro_io::coro_file file{}; + co_await file.async_open(part.filename, coro_io::flags::read_only); assert(file.is_open()); std::string file_data; - file_data.resize(max_single_part_size_); + detail::resize(file_data, max_single_part_size_); while (!file.eof()) { auto [rd_ec, rd_size] = co_await file.async_read(file_data.data(), file_data.size()); diff --git a/src/coro_io/examples/main.cpp b/src/coro_io/examples/main.cpp index 5808b8c8b..ad23959b1 100644 --- a/src/coro_io/examples/main.cpp +++ b/src/coro_io/examples/main.cpp @@ -49,8 +49,9 @@ void test_read_file() { ioc.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::read, - ioc.get_executor()); + coro_io::coro_file file{}; + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); bool r = file.is_open(); if (!file.is_open()) { return; @@ -84,8 +85,10 @@ void test_write_and_read_file() { ioc.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::write, - ioc.get_executor()); + coro_io::coro_file file{ioc.get_executor()}; + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); + bool r = file.is_open(); if (!file.is_open()) { return; @@ -106,8 +109,10 @@ void test_write_and_read_file() { std::cout << ec.message() << "\n"; } - coro_io::coro_file file1(filename, coro_io::open_mode::read, - ioc.get_executor()); + coro_io::coro_file file1{ioc.get_executor()}; + async_simple::coro::syncAwait( + file1.async_open(filename, coro_io::flags::read_only)); + r = file1.is_open(); if (!file1.is_open()) { return; @@ -134,7 +139,9 @@ void test_read_with_pool() { std::string filename = "test1.txt"; create_temp_file("test1.txt", 1024); - coro_io::coro_file file(filename); + coro_io::coro_file file{}; + async_simple::coro::syncAwait(file.async_open(filename)); + bool r = file.is_open(); if (!file.is_open()) { return; @@ -155,7 +162,10 @@ void test_read_with_pool() { } std::string str = "test async write"; - coro_io::coro_file file1(filename, coro_io::open_mode::write); + coro_io::coro_file file1{}; + async_simple::coro::syncAwait( + file1.async_open(filename, coro_io::flags::create_write)); + r = file1.is_open(); if (!file1.is_open()) { return; @@ -171,7 +181,10 @@ void test_write_with_pool() { std::string filename = "test1.txt"; create_temp_file("test1.txt", 10); - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file{}; + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); + bool r = file.is_open(); if (!file.is_open()) { return; diff --git a/src/coro_io/tests/test_corofile.cpp b/src/coro_io/tests/test_corofile.cpp index 9921c92d9..fad8d4f03 100644 --- a/src/coro_io/tests/test_corofile.cpp +++ b/src/coro_io/tests/test_corofile.cpp @@ -61,116 +61,201 @@ void create_file(std::string filename, size_t file_size, return; } -// TODO: will revert later. -// TEST_CASE("multithread for balance") { -// size_t total = 100; -// std::vector filenames; -// for (size_t i = 0; i < total; ++i) { -// filenames.push_back("temp" + std::to_string(i + 1)); -// } -// -// std::vector write_str_vec; -// char ch = 'a'; -// for (int i = 0; i < 26; ++i) { -// std::string str(100, ch + i); -// write_str_vec.push_back(std::move(str)); -// } -// -// std::vector> write_vec; -// auto write_file_func = -// [&write_str_vec](std::string filename, -// int index) mutable -> async_simple::coro::Lazy { -// coro_io::coro_file file(filename, coro_io::open_mode::write, -// coro_io::get_global_block_executor< -// coro_io::multithread_context_pool>()); -// CHECK(file.is_open()); -// -// size_t id = index % write_str_vec.size(); -// auto& str = write_str_vec[id]; -// auto ec = co_await file.async_write(str.data(), str.size()); -// CHECK(!ec); -// co_return; -// }; -// -// for (size_t i = 0; i < total; ++i) { -// write_vec.push_back(write_file_func(filenames[i], i)); -// } -// -// auto wait_func = -// [write_vec = -// std::move(write_vec)]() mutable -> async_simple::coro::Lazy -// { -// co_await async_simple::coro::collectAll(std::move(write_vec)); -// }; -// -// async_simple::coro::syncAwait(wait_func()); -// -// std::cout << "write finished\n"; -// -// // read and compare -// std::vector> read_vec; -// -// auto read_file_func = -// [&write_str_vec](std::string filename, -// int index) mutable -> async_simple::coro::Lazy { -// coro_io::coro_file file(filename, coro_io::open_mode::read, -// coro_io::get_global_block_executor< -// coro_io::multithread_context_pool>()); -// CHECK(file.is_open()); -// -// size_t id = index % write_str_vec.size(); -// auto& str = write_str_vec[id]; -// std::string buf; -// buf.resize(write_str_vec.back().size()); -// -// std::error_code ec; -// size_t read_size; -// std::tie(ec, read_size) = co_await file.async_read(buf.data(), -// buf.size()); CHECK(!ec); bool ok = (str == buf); if (!ok) { -// std::cout << "str: " << str << "\n"; -// std::cout << "read buf: " << buf << "\n"; -// } -// CHECK(ok); -// co_return; -// }; -// -// for (size_t i = 0; i < total; ++i) { -// read_vec.push_back(read_file_func(filenames[i], i)); -// } -// -// auto wait_read_func = -// [read_vec = -// std::move(read_vec)]() mutable -> async_simple::coro::Lazy { -// co_await async_simple::coro::collectAll(std::move(read_vec)); -// }; -// -// async_simple::coro::syncAwait(wait_read_func()); -// std::cout << "read finished\n"; -// -// std::error_code ec; -// for (auto& filename : filenames) { -// fs::remove(fs::path(filename), ec); -// if (ec) { -// std::cout << "remove file error: " << ec.message() << "\n"; -// } -// } -// } - -async_simple::coro::Lazy foo() { co_return; } - -TEST_CASE("test currentThreadInExecutor") { - CHECK(*coro_io::get_current() == nullptr); - CHECK(coro_io::get_global_executor()->currentContextId() == 0); - CHECK_NOTHROW( - async_simple::coro::syncAwait(foo().via(coro_io::get_global_executor()))); - auto executor = coro_io::get_global_executor(); - - foo().via(executor).start([executor](auto&&) { - auto ptr = &executor->get_asio_executor().context(); - CHECK(ptr == *coro_io::get_current()); - size_t id = executor->currentContextId(); - CHECK(id > 0); - }); +void create_files(const std::vector& files, size_t file_size) { + std::string content(file_size, 'A'); + for (auto& filename : files) { + std::ofstream out(filename, std::ios::binary); + out.write(content.data(), content.size()); + } +} + +#if defined(__GNUC__) and defined(USE_PREAD_WRITE) +TEST_CASE("coro_file pread and pwrite basic test") { + std::string filename = "test.tmp"; + create_files({filename}, 190); + { + coro_io::coro_file file{}; + async_simple::coro::syncAwait( + file.async_open(filename.data(), coro_io::flags::read_only)); + CHECK(file.is_open()); + + char buf[100]; + auto pair = async_simple::coro::syncAwait(file.async_read(0, buf, 10)); + CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA"); + CHECK(!file.eof()); + + pair = async_simple::coro::syncAwait(file.async_read(10, buf, 100)); + CHECK(!file.eof()); + CHECK(pair.second == 100); + + pair = async_simple::coro::syncAwait(file.async_read(110, buf, 100)); + CHECK(!file.eof()); + CHECK(pair.second == 80); + + // only read size equal 0 is eof. + pair = async_simple::coro::syncAwait(file.async_read(200, buf, 100)); + CHECK(file.eof()); + CHECK(pair.second == 0); + } + + { + coro_io::coro_file file{}; + async_simple::coro::syncAwait( + file.async_open(filename.data(), coro_io::flags::read_write)); + CHECK(file.is_open()); + + std::string buf = "cccccccccc"; + auto ec = async_simple::coro::syncAwait( + file.async_write(0, buf.data(), buf.size())); + CHECK(!ec); + + std::string buf1 = "dddddddddd"; + ec = async_simple::coro::syncAwait( + file.async_write(10, buf1.data(), buf1.size())); + CHECK(!ec); + + char buf2[100]; + auto pair = async_simple::coro::syncAwait(file.async_read(0, buf2, 10)); + CHECK(!file.eof()); + CHECK(std::string_view(buf2, pair.second) == "cccccccccc"); + + pair = async_simple::coro::syncAwait(file.async_read(10, buf2, 10)); + CHECK(!file.eof()); + CHECK(std::string_view(buf2, pair.second) == "dddddddddd"); + } +} +#else +async_simple::coro::Lazy test_basic_read(std::string filename) { + coro_io::coro_file file{}; + co_await file.async_open(filename.data(), coro_io::flags::read_only); + std::string str; + str.resize(200); + + { + auto [ec, size] = co_await file.async_read(str.data(), 10); + std::cout << size << ", " << file.eof() << "\n"; + } + { + bool ok = file.seek(10, SEEK_CUR); + std::cout << ok << "\n"; + } + { + auto [ec, size] = co_await file.async_read(str.data(), str.size()); + std::cout << size << ", " << file.eof() << "\n"; + } +} + +async_simple::coro::Lazy test_basic_write(std::string filename) { + coro_io::coro_file file{}; + co_await file.async_open(filename.data(), coro_io::flags::read_write); + std::string str = "hello"; + + { + auto ec = co_await file.async_write(str.data(), str.size()); + std::string result; + result.resize(10); + file.seek(0, SEEK_SET); + auto [rd_ec, size] = co_await file.async_read(result.data(), 5); + std::string_view s(result.data(), size); + CHECK(s == "hello"); + } + { + bool ok = file.seek(10, SEEK_SET); + auto ec = co_await file.async_write(str.data(), str.size()); + file.seek(10, SEEK_SET); + std::string result; + result.resize(10); + auto [rd_ec, size] = co_await file.async_read(result.data(), 5); + std::string_view s(result.data(), size); + CHECK(s == "hello"); + + std::cout << ec << "\n"; + } +} + +TEST_CASE("multithread for balance") { + size_t total = 100; + std::vector filenames; + for (size_t i = 0; i < total; ++i) { + filenames.push_back("temp" + std::to_string(i + 1)); + } + + std::vector write_str_vec; + char ch = 'a'; + for (int i = 0; i < 26; ++i) { + std::string str(100, ch + i); + write_str_vec.push_back(std::move(str)); + } + + std::vector > write_vec; + auto write_file_func = + [&write_str_vec](std::string filename, + int index) mutable -> async_simple::coro::Lazy { + coro_io::coro_file file(coro_io::get_global_block_executor< + coro_io::multithread_context_pool>()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); + CHECK(file.is_open()); + + size_t id = index % write_str_vec.size(); + auto& str = write_str_vec[id]; + auto ec = co_await file.async_write(str.data(), str.size()); + CHECK(!ec); + co_return; + }; + + for (size_t i = 0; i < total; ++i) { + write_vec.push_back(write_file_func(filenames[i], i)); + } + + auto wait_func = + [write_vec = + std::move(write_vec)]() mutable -> async_simple::coro::Lazy { + co_await async_simple::coro::collectAll(std::move(write_vec)); + }; + + async_simple::coro::syncAwait(wait_func()); + + // read and compare + std::vector > read_vec; + + auto read_file_func = + [&write_str_vec](std::string filename, + int index) mutable -> async_simple::coro::Lazy { + coro_io::coro_file file(coro_io::get_global_block_executor< + coro_io::multithread_context_pool>()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); + CHECK(file.is_open()); + + size_t id = index % write_str_vec.size(); + auto& str = write_str_vec[id]; + std::string buf; + buf.resize(write_str_vec.back().size()); + + std::error_code ec; + size_t read_size; + std::tie(ec, read_size) = co_await file.async_read(buf.data(), buf.size()); + CHECK(!ec); + CHECK(str == buf); + co_return; + }; + + for (size_t i = 0; i < total; ++i) { + read_vec.push_back(read_file_func(filenames[i], i)); + } + + auto wait_read_func = + [read_vec = + std::move(read_vec)]() mutable -> async_simple::coro::Lazy { + co_await async_simple::coro::collectAll(std::move(read_vec)); + }; + + async_simple::coro::syncAwait(wait_read_func()); + + for (auto& filename : filenames) { + fs::remove(fs::path(filename)); + } } TEST_CASE("read write 100 small files") { @@ -192,13 +277,15 @@ TEST_CASE("read write 100 small files") { write_str_vec.push_back(std::move(str)); } - std::vector> write_vec; + std::vector > write_vec; auto write_file_func = [&pool, &write_str_vec]( std::string filename, int index) mutable -> async_simple::coro::Lazy { - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file(pool.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); CHECK(file.is_open()); size_t id = index % write_str_vec.size(); @@ -221,13 +308,15 @@ TEST_CASE("read write 100 small files") { async_simple::coro::syncAwait(wait_func()); // read and compare - std::vector> read_vec; + std::vector > read_vec; auto read_file_func = [&pool, &write_str_vec]( std::string filename, int index) mutable -> async_simple::coro::Lazy { - coro_io::coro_file file(filename); + coro_io::coro_file file(pool.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); CHECK(file.is_open()); size_t id = index % write_str_vec.size(); @@ -274,7 +363,9 @@ TEST_CASE("small_file_read_test") { ioc.run(); }); - coro_io::coro_file file(filename); + coro_io::coro_file file(ioc.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); CHECK(file.is_open()); char buf[block_size]{}; @@ -309,7 +400,9 @@ TEST_CASE("large_file_read_test") { ioc.run(); }); - coro_io::coro_file file(filename); + coro_io::coro_file file(ioc.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); CHECK(file.is_open()); char buf[block_size]{}; @@ -344,7 +437,9 @@ TEST_CASE("empty_file_read_test") { ioc.run(); }); - coro_io::coro_file file(filename); + coro_io::coro_file file(ioc.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); CHECK(file.is_open()); char buf[block_size]{}; @@ -375,7 +470,9 @@ TEST_CASE("small_file_read_with_pool_test") { pool.run(); }); - coro_io::coro_file file(filename); + coro_io::coro_file file(pool.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); CHECK(file.is_open()); char buf[block_size]{}; @@ -409,7 +506,9 @@ TEST_CASE("large_file_read_with_pool_test") { pool.run(); }); - coro_io::coro_file file(filename); + coro_io::coro_file file(pool.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::read_only)); CHECK(file.is_open()); char buf[block_size]{}; @@ -441,7 +540,9 @@ TEST_CASE("small_file_write_test") { ioc.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file(ioc.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); CHECK(file.is_open()); char buf[512]{}; @@ -454,6 +555,8 @@ TEST_CASE("small_file_write_test") { std::cout << ec.message() << "\n"; } + file.flush(); + std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -476,6 +579,7 @@ TEST_CASE("small_file_write_test") { if (ec) { std::cout << ec.message() << "\n"; } + file.flush(); is.open(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -505,7 +609,9 @@ TEST_CASE("large_file_write_test") { ioc.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file(ioc.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); CHECK(file.is_open()); auto block_vec = create_filled_vec("large_file_write_test"); @@ -526,6 +632,7 @@ TEST_CASE("large_file_write_test") { std::cout << ec.message() << "\n"; } } + file.flush(); CHECK(fs::file_size(filename) == file_size); std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { @@ -557,7 +664,9 @@ TEST_CASE("empty_file_write_test") { ioc.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file(ioc.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); CHECK(file.is_open()); char buf[512]{}; @@ -569,7 +678,7 @@ TEST_CASE("empty_file_write_test") { if (ec) { std::cout << ec.message() << "\n"; } - + file.flush(); std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -591,7 +700,9 @@ TEST_CASE("small_file_write_with_pool_test") { pool.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file(pool.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); CHECK(file.is_open()); char buf[512]{}; @@ -603,6 +714,7 @@ TEST_CASE("small_file_write_with_pool_test") { if (ec) { std::cout << ec.message() << "\n"; } + file.flush(); std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { @@ -626,6 +738,7 @@ TEST_CASE("small_file_write_with_pool_test") { if (ec) { std::cout << ec.message() << "\n"; } + file.flush(); is.open(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -654,7 +767,9 @@ TEST_CASE("large_file_write_with_pool_test") { pool.run(); }); - coro_io::coro_file file(filename, coro_io::open_mode::write); + coro_io::coro_file file(pool.get_executor()); + async_simple::coro::syncAwait( + file.async_open(filename, coro_io::flags::create_write)); CHECK(file.is_open()); auto block_vec = create_filled_vec("large_file_write_with_pool_test"); @@ -675,7 +790,9 @@ TEST_CASE("large_file_write_with_pool_test") { std::cout << ec.message() << "\n"; } } - CHECK(fs::file_size(filename) == file_size); + file.flush(); + size_t sz = fs::file_size(filename); + CHECK(sz == file_size); std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -698,3 +815,4 @@ TEST_CASE("large_file_write_with_pool_test") { file.close(); fs::remove(fs::path(filename)); } +#endif