From 08640964b188b2517d8474adec5ae872c168490f Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 26 Dec 2025 15:09:23 +0800 Subject: [PATCH 1/6] Use read buffer Signed-off-by: JaySon-Huang --- dbms/src/Storages/S3/FileCache.cpp | 34 ++++++++++++++++++------------ 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index c7a20323e87..eca61df3fe7 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -21,7 +21,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -32,9 +34,11 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -1009,22 +1013,26 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c prepareParentDir(local_fname); auto temp_fname = toTemporaryFilename(local_fname); { - Aws::OFStream ostr(temp_fname, std::ios_base::out | std::ios_base::binary); - RUNTIME_CHECK_MSG(ostr.is_open(), "Open {} failed: {}", temp_fname, strerror(errno)); + PosixWritableFile ofile(temp_fname, true, O_CREAT | O_WRONLY, 0666, write_limiter); + if (content_length > 0) { - if (write_limiter) - write_limiter->request(content_length); GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length); - ostr << result.GetBody().rdbuf(); - // If content_length == 0, ostr.good() is false. Does not know the reason. - RUNTIME_CHECK_MSG( - ostr.good(), - "Write {} content_length {} failed: {}", - temp_fname, - content_length, - strerror(errno)); - ostr.flush(); + ReadBufferFromIStream rbuf(result.GetBody(), std::min(content_length, static_cast(16 * 1024))); + ssize_t write_res = 0; + while (!rbuf.eof()) + { + size_t count = rbuf.buffer().end() - rbuf.position(); + if (write_res = ofile.write(rbuf.position(), count); write_res < 0) + { + throwFromErrno(fmt::format("write to file failed, fname={}", temp_fname), write_res, errno); + } + rbuf.position() += count; + } + if (auto res = ofile.fsync(); res < 0) + { + throwFromErrno(fmt::format("fsync file failed, fname={}", temp_fname), res, errno); + } } } std::filesystem::rename(temp_fname, local_fname); From 2bc6ff5cf14f4159ee9b326740016b5022dc24cd Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 31 Dec 2025 10:21:51 +0800 Subject: [PATCH 2/6] Extract func Signed-off-by: JaySon-Huang --- dbms/src/Storages/S3/FileCache.cpp | 67 +++++++++++++++++++----------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index eca61df3fe7..2c3830c9a02 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -984,6 +985,43 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size, return true; } +size_t downloadToLocal( + Aws::IOStream & istr, + const String & fname, + Int64 content_length, + const WriteLimiterPtr & write_limiter) +{ + // create an empty file with write_limiter + // each time `ofile.write` is called, the write speed will be controlled by the write_limiter. + PosixWritableFile ofile(fname, true, O_CREAT | O_WRONLY, 0666, write_limiter); + // simply create an empty file + if (unlikely(content_length <= 0)) + return 0; + + GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length); + size_t total_written = 0; + static const Int64 MAX_BUFFER_SIZE = 16 * 1024; // 16k + ReadBufferFromIStream rbuf(istr, std::min(content_length, MAX_BUFFER_SIZE)); + while (!rbuf.eof()) + { + size_t count = rbuf.buffer().end() - rbuf.position(); + if (ssize_t write_res = ofile.write(rbuf.position(), count); write_res < 0) + { + throwFromErrno(fmt::format("write to file failed, fname={}", fname), write_res, errno); + } + else + { + total_written += write_res; + } + rbuf.position() += count; + } + if (auto res = ofile.fsync(); res < 0) + { + throwFromErrno(fmt::format("fsync file failed, fname={}", fname), res, errno); + } + return total_written; +} + void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter) { Stopwatch sw; @@ -1011,32 +1049,11 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c const auto & local_fname = file_seg->getLocalFileName(); prepareParentDir(local_fname); + // download as a temp file then rename to a formal file auto temp_fname = toTemporaryFilename(local_fname); - { - PosixWritableFile ofile(temp_fname, true, O_CREAT | O_WRONLY, 0666, write_limiter); - - if (content_length > 0) - { - GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length); - ReadBufferFromIStream rbuf(result.GetBody(), std::min(content_length, static_cast(16 * 1024))); - ssize_t write_res = 0; - while (!rbuf.eof()) - { - size_t count = rbuf.buffer().end() - rbuf.position(); - if (write_res = ofile.write(rbuf.position(), count); write_res < 0) - { - throwFromErrno(fmt::format("write to file failed, fname={}", temp_fname), write_res, errno); - } - rbuf.position() += count; - } - if (auto res = ofile.fsync(); res < 0) - { - throwFromErrno(fmt::format("fsync file failed, fname={}", temp_fname), res, errno); - } - } - } + size_t fsize = downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter); std::filesystem::rename(temp_fname, local_fname); - auto fsize = std::filesystem::file_size(local_fname); + capacity_metrics->addUsedSize(local_fname, fsize); RUNTIME_CHECK_MSG( fsize == static_cast(content_length), @@ -1045,7 +1062,7 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c fsize, content_length); file_seg->setStatus(FileSegment::Status::Complete); - LOG_DEBUG( + LOG_INFO( log, "Download s3_key={} to local={} size={} cost={}ms", s3_key, From ee3f1532a04701648688cd5bd455578909c351b7 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 5 Jan 2026 22:08:01 +0800 Subject: [PATCH 3/6] Refine the unit tests Signed-off-by: JaySon-Huang --- dbms/src/Storages/S3/FileCache.cpp | 36 +++------ .../src/Storages/S3/tests/gtest_filecache.cpp | 78 ++++++++++--------- 2 files changed, 52 insertions(+), 62 deletions(-) diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index 2c3830c9a02..321e2926973 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -24,7 +24,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -39,7 +41,6 @@ #include #include -#include #include #include #include @@ -985,7 +986,7 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size, return true; } -size_t downloadToLocal( +void downloadToLocal( Aws::IOStream & istr, const String & fname, Int64 content_length, @@ -993,33 +994,17 @@ size_t downloadToLocal( { // create an empty file with write_limiter // each time `ofile.write` is called, the write speed will be controlled by the write_limiter. - PosixWritableFile ofile(fname, true, O_CREAT | O_WRONLY, 0666, write_limiter); + auto ofile = std::make_shared(fname, true, O_CREAT | O_WRONLY, 0666, write_limiter); // simply create an empty file if (unlikely(content_length <= 0)) - return 0; + return; GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length); - size_t total_written = 0; static const Int64 MAX_BUFFER_SIZE = 16 * 1024; // 16k ReadBufferFromIStream rbuf(istr, std::min(content_length, MAX_BUFFER_SIZE)); - while (!rbuf.eof()) - { - size_t count = rbuf.buffer().end() - rbuf.position(); - if (ssize_t write_res = ofile.write(rbuf.position(), count); write_res < 0) - { - throwFromErrno(fmt::format("write to file failed, fname={}", fname), write_res, errno); - } - else - { - total_written += write_res; - } - rbuf.position() += count; - } - if (auto res = ofile.fsync(); res < 0) - { - throwFromErrno(fmt::format("fsync file failed, fname={}", fname), res, errno); - } - return total_written; + WriteBufferFromWritableFile wbuf(ofile, std::min(content_length, MAX_BUFFER_SIZE)); + copyData(rbuf, wbuf, content_length); + wbuf.sync(); } void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter) @@ -1048,11 +1033,12 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c file_seg->setSize(content_length); const auto & local_fname = file_seg->getLocalFileName(); - prepareParentDir(local_fname); // download as a temp file then rename to a formal file + prepareParentDir(local_fname); auto temp_fname = toTemporaryFilename(local_fname); - size_t fsize = downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter); + downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter); std::filesystem::rename(temp_fname, local_fname); + auto fsize = std::filesystem::file_size(local_fname); capacity_metrics->addUsedSize(local_fname, fsize); RUNTIME_CHECK_MSG( diff --git a/dbms/src/Storages/S3/tests/gtest_filecache.cpp b/dbms/src/Storages/S3/tests/gtest_filecache.cpp index f07ccba87cd..be728835456 100644 --- a/dbms/src/Storages/S3/tests/gtest_filecache.cpp +++ b/dbms/src/Storages/S3/tests/gtest_filecache.cpp @@ -113,7 +113,7 @@ class FileCacheTest : public ::testing::Test } auto r = file.fsync(); ASSERT_EQ(r, 0); - LOG_DEBUG(log, "write fname={} size={} done, cost={}s", key, size, sw.elapsedSeconds()); + LOG_DEBUG(log, "write fname={} size={} done, cost={:.3f}s", key, size, sw.elapsedSeconds()); } void writeS3FileWithSize(const S3Filename & s3_dir, std::string_view file_name, size_t size) @@ -199,14 +199,15 @@ class FileCacheTest : public ::testing::Test { std::this_thread::sleep_for(1000ms); } - LOG_DEBUG( + LOG_INFO( log, - "Download summary: succ={} fail={} cost={}s", + "Download summary: succ={} fail={} cost={:.3f}s", file_cache.bg_download_succ_count.load(std::memory_order_relaxed), file_cache.bg_download_fail_count.load(std::memory_order_relaxed), sw.elapsedSeconds()); } + // Update the config.capacity to make sure dtfile cache capacity equals to `dt_size`. static void calculateCacheCapacity(StorageRemoteCacheConfig & config, UInt64 dt_size) { config.capacity = dt_size / (1.0 - config.delta_rate); @@ -248,18 +249,19 @@ try Stopwatch sw; auto objects = genObjects(/*store_count*/ 1, /*table_count*/ 1, /*file_count*/ 1, basenames); auto total_size = objectsTotalSize(objects); - LOG_DEBUG(log, "genObjects: count={} total_size={} cost={}s", objects.size(), total_size, sw.elapsedSeconds()); + LOG_INFO(log, "genObjects: count={} total_size={} cost={:.3f}s", objects.size(), total_size, sw.elapsedSeconds()); auto cache_dir = fmt::format("{}/file_cache_all", tmp_dir); StorageRemoteCacheConfig cache_config{.dir = cache_dir, .dtfile_level = 100}; calculateCacheCapacity(cache_config, total_size); - LOG_DEBUG(log, "total_size={} dt_cache_capacity={}", total_size, cache_config.getDTFileCapacity()); + LOG_INFO(log, "total_size={} dt_cache_capacity={}", total_size, cache_config.getDTFileCapacity()); UInt16 vcores = 4; IORateLimiter rate_limiter; { - LOG_DEBUG(log, "Cache all data"); + // download all files to local filesystem as cache + LOG_INFO(log, "Cache all data"); FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter); for (const auto & obj : objects) { @@ -283,7 +285,8 @@ try } { - LOG_DEBUG(log, "Cache restore"); + // restore cache from local filesystem after process restart + LOG_INFO(log, "Cache restore"); FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter); ASSERT_EQ(file_cache.cache_used, file_cache.cache_capacity); for (const auto & obj : objects) @@ -297,37 +300,43 @@ try } } - auto meta_objects = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"}); - ASSERT_EQ(meta_objects.size(), 2 * 2 * 2); { - LOG_DEBUG(log, "Evict success"); + LOG_INFO(log, "Prepare for evict fail case"); + auto meta_objects2 = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"}); + ASSERT_EQ(meta_objects2.size(), 2 * 2 * 2); + FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter); - ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity); - for (const auto & obj : meta_objects) + UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used; + LOG_INFO(log, "Running evict failed cases, free_size={}", free_size); + // Keep the file_seg ptrs to mock reading in progress, it should prevent file_segment from being evicted. + auto file_seg = file_cache.getAll(); + for (const auto & obj : meta_objects2) { auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key); ASSERT_TRUE(s3_fname.isDataFile()) << obj.key; + // cache miss and try init background download auto file_seg = file_cache.get(s3_fname); - if (file_seg == nullptr) - { - waitForBgDownload(file_cache); - file_seg = file_cache.get(s3_fname); - } - ASSERT_NE(file_seg, nullptr) << obj.key; - ASSERT_TRUE(file_seg->isReadyToRead()); - ASSERT_EQ(file_seg->getSize(), obj.size); + ASSERT_EQ(file_seg, nullptr) << obj.key; + waitForBgDownload(file_cache); + // after bg download finished, try get again, should still miss as evict failed + file_seg = file_cache.get(s3_fname); + ASSERT_EQ(file_seg, nullptr) << fmt::format("key={} size={} free_size={}", obj.key, obj.size, free_size); + LOG_INFO(log, "Evict failed as expected, key={} size={} free_size={}", obj.key, obj.size, free_size); } + waitForBgDownload(file_cache); } - auto meta_objects2 = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"}); - ASSERT_EQ(meta_objects2.size(), 2 * 2 * 2); { - LOG_DEBUG(log, "Evict failed"); + // Evict cached files + LOG_INFO(log, "Prepare for evict success case"); + auto meta_objects = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"}); + ASSERT_EQ(meta_objects.size(), 2 * 2 * 2); + FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter); ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity); UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used; - auto file_seg = file_cache.getAll(); // Prevent file_segment from evicted. - for (const auto & obj : meta_objects2) + LOG_INFO(log, "Running evict success cases, free_size={}", free_size); + for (const auto & obj : meta_objects) { auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key); ASSERT_TRUE(s3_fname.isDataFile()) << obj.key; @@ -336,20 +345,15 @@ try { waitForBgDownload(file_cache); file_seg = file_cache.get(s3_fname); - if (free_size > obj.size) - { - free_size -= obj.size; - ASSERT_EQ(free_size, file_cache.cache_capacity - file_cache.cache_used); - ASSERT_NE(file_seg, nullptr) << obj.key; - ASSERT_TRUE(file_seg->isReadyToRead()); - ASSERT_EQ(file_seg->getSize(), obj.size); - } - else - { - ASSERT_EQ(file_seg, nullptr) << obj.key; - } } + ASSERT_NE(file_seg, nullptr) << obj.key; + ASSERT_TRUE(file_seg->isReadyToRead()); + ASSERT_EQ(file_seg->getSize(), obj.size); } + ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity); + free_size = file_cache.cache_capacity - file_cache.cache_used; + LOG_INFO(log, "After evict and cache new files, free_size={}", free_size); + waitForBgDownload(file_cache); } } From a4ce045e0490a2153756f84f8576b5903c2045f6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 5 Jan 2026 22:56:15 +0800 Subject: [PATCH 4/6] Refine interface Signed-off-by: JaySon-Huang --- dbms/src/Storages/S3/FileCache.cpp | 22 ++++++++++++++++------ dbms/src/Storages/S3/FileCache.h | 16 +++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index 321e2926973..e88dadd33d1 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -1027,10 +1027,15 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c SYNC_FOR("before_FileCache::downloadImpl_reserve_size"); if (!finalizeReservedSize(file_seg->getFileType(), file_seg->getSize(), content_length)) { - LOG_DEBUG(log, "s3_key={} finalizeReservedSize {}=>{} failed.", s3_key, file_seg->getSize(), content_length); + LOG_INFO( + log, + "Download finalizeReservedSize failed, s3_key={} seg_size={} size={}", + s3_key, + file_seg->getSize(), + content_length); + file_seg->setStatus(FileSegment::Status::Failed); return; } - file_seg->setSize(content_length); const auto & local_fname = file_seg->getLocalFileName(); // download as a temp file then rename to a formal file @@ -1038,19 +1043,24 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c auto temp_fname = toTemporaryFilename(local_fname); downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter); std::filesystem::rename(temp_fname, local_fname); - auto fsize = std::filesystem::file_size(local_fname); - capacity_metrics->addUsedSize(local_fname, fsize); +#ifndef NDEBUG + // sanity check under debug mode + auto fsize = std::filesystem::file_size(local_fname); RUNTIME_CHECK_MSG( fsize == static_cast(content_length), "local_fname={}, file_size={}, content_length={}", local_fname, fsize, content_length); - file_seg->setStatus(FileSegment::Status::Complete); +#endif + + capacity_metrics->addUsedSize(local_fname, content_length); + // update the file segment size and set as complete + file_seg->setComplete(content_length); LOG_INFO( log, - "Download s3_key={} to local={} size={} cost={}ms", + "Download success, s3_key={} local={} size={} cost={}ms", s3_key, local_fname, content_length, diff --git a/dbms/src/Storages/S3/FileCache.h b/dbms/src/Storages/S3/FileCache.h index 6aa36358c4a..6e1ef0fbb74 100644 --- a/dbms/src/Storages/S3/FileCache.h +++ b/dbms/src/Storages/S3/FileCache.h @@ -89,10 +89,12 @@ class FileSegment Status waitForNotEmpty(); - void setSize(UInt64 size_) + void setComplete(UInt64 size_) { std::lock_guard lock(mtx); size = size_; + status = FileSegment::Status::Complete; + cv_ready.notify_all(); } void setStatus(Status s) @@ -103,6 +105,12 @@ class FileSegment cv_ready.notify_all(); } + Status getStatus() const + { + std::lock_guard lock(mtx); + return status; + } + UInt64 getSize() const { std::lock_guard lock(mtx); @@ -133,12 +141,6 @@ class FileSegment return (std::chrono::system_clock::now() - last_access_time) < sec; } - Status getStatus() const - { - std::lock_guard lock(mtx); - return status; - } - auto getLastAccessTime() const { std::unique_lock lock(mtx); From 8096c475876405014fa83c5e99c87835ea59ab16 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 5 Jan 2026 23:20:23 +0800 Subject: [PATCH 5/6] Address comment Signed-off-by: JaySon-Huang --- dbms/src/Storages/S3/tests/gtest_filecache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/S3/tests/gtest_filecache.cpp b/dbms/src/Storages/S3/tests/gtest_filecache.cpp index be728835456..a69149e97bd 100644 --- a/dbms/src/Storages/S3/tests/gtest_filecache.cpp +++ b/dbms/src/Storages/S3/tests/gtest_filecache.cpp @@ -309,7 +309,7 @@ try UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used; LOG_INFO(log, "Running evict failed cases, free_size={}", free_size); // Keep the file_seg ptrs to mock reading in progress, it should prevent file_segment from being evicted. - auto file_seg = file_cache.getAll(); + auto all_file_segs = file_cache.getAll(); for (const auto & obj : meta_objects2) { auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key); From 96aa3fde4943688326c012635a761bd766be1f11 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 8 Jan 2026 22:06:25 +0800 Subject: [PATCH 6/6] Increase the buffer size to 128K Signed-off-by: JaySon-Huang --- dbms/src/Storages/S3/FileCache.cpp | 2 +- tests/docker/next-gen-utils/.gitignore | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index e88dadd33d1..3f953639d8f 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -1000,7 +1000,7 @@ void downloadToLocal( return; GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length); - static const Int64 MAX_BUFFER_SIZE = 16 * 1024; // 16k + static const Int64 MAX_BUFFER_SIZE = 128 * 1024; // 128k ReadBufferFromIStream rbuf(istr, std::min(content_length, MAX_BUFFER_SIZE)); WriteBufferFromWritableFile wbuf(ofile, std::min(content_length, MAX_BUFFER_SIZE)); copyData(rbuf, wbuf, content_length); diff --git a/tests/docker/next-gen-utils/.gitignore b/tests/docker/next-gen-utils/.gitignore index e935fd386e1..cf588ca4ecb 100644 --- a/tests/docker/next-gen-utils/.gitignore +++ b/tests/docker/next-gen-utils/.gitignore @@ -1 +1,2 @@ binaries +master_key