Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions dbms/src/Storages/S3/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@
#include <Common/TiFlashMetrics.h>
#include <Common/escapeForFileName.h>
#include <IO/BaseFile/PosixRandomAccessFile.h>
#include <IO/BaseFile/PosixWritableFile.h>
#include <IO/BaseFile/RateLimiter.h>
#include <IO/Buffer/ReadBufferFromIStream.h>
#include <IO/Buffer/WriteBufferFromWritableFile.h>
#include <IO/IOThreadPools.h>
#include <IO/copyData.h>
#include <Interpreters/Settings.h>
#include <Server/StorageConfigParser.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/S3/FileCache.h>
#include <Storages/S3/FileCachePerf.h>
#include <Storages/S3/S3Common.h>
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <common/logger_useful.h>
#include <fcntl.h>
#include <fmt/chrono.h>

#include <atomic>
Expand Down Expand Up @@ -980,6 +986,27 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size,
return true;
}

void downloadToLocal(
Aws::IOStream & istr,
const String & fname,
Int64 content_length,
const WriteLimiterPtr & write_limiter)
{
// create an empty file with write_limiter
// each time `ofile.write` is called, the write speed will be controlled by the write_limiter.
auto ofile = std::make_shared<PosixWritableFile>(fname, true, O_CREAT | O_WRONLY, 0666, write_limiter);
// simply create an empty file
if (unlikely(content_length <= 0))
return;

GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length);
static const Int64 MAX_BUFFER_SIZE = 16 * 1024; // 16k
ReadBufferFromIStream rbuf(istr, std::min(content_length, MAX_BUFFER_SIZE));
WriteBufferFromWritableFile wbuf(ofile, std::min(content_length, MAX_BUFFER_SIZE));
copyData(rbuf, wbuf, content_length);
wbuf.sync();
}

void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter)
{
Stopwatch sw;
Expand All @@ -1006,29 +1033,13 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c
file_seg->setSize(content_length);

const auto & local_fname = file_seg->getLocalFileName();
// download as a temp file then rename to a formal file
prepareParentDir(local_fname);
auto temp_fname = toTemporaryFilename(local_fname);
{
Aws::OFStream ostr(temp_fname, std::ios_base::out | std::ios_base::binary);
RUNTIME_CHECK_MSG(ostr.is_open(), "Open {} failed: {}", temp_fname, strerror(errno));
if (content_length > 0)
{
if (write_limiter)
write_limiter->request(content_length);
GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length);
ostr << result.GetBody().rdbuf();
// If content_length == 0, ostr.good() is false. Does not know the reason.
RUNTIME_CHECK_MSG(
ostr.good(),
"Write {} content_length {} failed: {}",
temp_fname,
content_length,
strerror(errno));
ostr.flush();
}
}
downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter);
std::filesystem::rename(temp_fname, local_fname);
auto fsize = std::filesystem::file_size(local_fname);

capacity_metrics->addUsedSize(local_fname, fsize);
RUNTIME_CHECK_MSG(
fsize == static_cast<UInt64>(content_length),
Expand All @@ -1037,7 +1048,7 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c
fsize,
content_length);
file_seg->setStatus(FileSegment::Status::Complete);
LOG_DEBUG(
LOG_INFO(
log,
"Download s3_key={} to local={} size={} cost={}ms",
s3_key,
Expand Down
78 changes: 41 additions & 37 deletions dbms/src/Storages/S3/tests/gtest_filecache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class FileCacheTest : public ::testing::Test
}
auto r = file.fsync();
ASSERT_EQ(r, 0);
LOG_DEBUG(log, "write fname={} size={} done, cost={}s", key, size, sw.elapsedSeconds());
LOG_DEBUG(log, "write fname={} size={} done, cost={:.3f}s", key, size, sw.elapsedSeconds());
}

void writeS3FileWithSize(const S3Filename & s3_dir, std::string_view file_name, size_t size)
Expand Down Expand Up @@ -199,14 +199,15 @@ class FileCacheTest : public ::testing::Test
{
std::this_thread::sleep_for(1000ms);
}
LOG_DEBUG(
LOG_INFO(
log,
"Download summary: succ={} fail={} cost={}s",
"Download summary: succ={} fail={} cost={:.3f}s",
file_cache.bg_download_succ_count.load(std::memory_order_relaxed),
file_cache.bg_download_fail_count.load(std::memory_order_relaxed),
sw.elapsedSeconds());
}

// Update the config.capacity to make sure dtfile cache capacity equals to `dt_size`.
static void calculateCacheCapacity(StorageRemoteCacheConfig & config, UInt64 dt_size)
{
config.capacity = dt_size / (1.0 - config.delta_rate);
Expand Down Expand Up @@ -248,18 +249,19 @@ try
Stopwatch sw;
auto objects = genObjects(/*store_count*/ 1, /*table_count*/ 1, /*file_count*/ 1, basenames);
auto total_size = objectsTotalSize(objects);
LOG_DEBUG(log, "genObjects: count={} total_size={} cost={}s", objects.size(), total_size, sw.elapsedSeconds());
LOG_INFO(log, "genObjects: count={} total_size={} cost={:.3f}s", objects.size(), total_size, sw.elapsedSeconds());

auto cache_dir = fmt::format("{}/file_cache_all", tmp_dir);
StorageRemoteCacheConfig cache_config{.dir = cache_dir, .dtfile_level = 100};
calculateCacheCapacity(cache_config, total_size);
LOG_DEBUG(log, "total_size={} dt_cache_capacity={}", total_size, cache_config.getDTFileCapacity());
LOG_INFO(log, "total_size={} dt_cache_capacity={}", total_size, cache_config.getDTFileCapacity());

UInt16 vcores = 4;
IORateLimiter rate_limiter;

{
LOG_DEBUG(log, "Cache all data");
// download all files to local filesystem as cache
LOG_INFO(log, "Cache all data");
FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
for (const auto & obj : objects)
{
Expand All @@ -283,7 +285,8 @@ try
}

{
LOG_DEBUG(log, "Cache restore");
// restore cache from local filesystem after process restart
LOG_INFO(log, "Cache restore");
FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
ASSERT_EQ(file_cache.cache_used, file_cache.cache_capacity);
for (const auto & obj : objects)
Expand All @@ -297,37 +300,43 @@ try
}
}

auto meta_objects = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects.size(), 2 * 2 * 2);
{
LOG_DEBUG(log, "Evict success");
LOG_INFO(log, "Prepare for evict fail case");
auto meta_objects2 = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects2.size(), 2 * 2 * 2);

FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity);
for (const auto & obj : meta_objects)
UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used;
LOG_INFO(log, "Running evict failed cases, free_size={}", free_size);
// Keep the file_seg ptrs to mock reading in progress, it should prevent file_segment from being evicted.
auto file_seg = file_cache.getAll();
for (const auto & obj : meta_objects2)
{
auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key);
ASSERT_TRUE(s3_fname.isDataFile()) << obj.key;
// cache miss and try init background download
auto file_seg = file_cache.get(s3_fname);
if (file_seg == nullptr)
{
waitForBgDownload(file_cache);
file_seg = file_cache.get(s3_fname);
}
ASSERT_NE(file_seg, nullptr) << obj.key;
ASSERT_TRUE(file_seg->isReadyToRead());
ASSERT_EQ(file_seg->getSize(), obj.size);
ASSERT_EQ(file_seg, nullptr) << obj.key;
waitForBgDownload(file_cache);
// after bg download finished, try get again, should still miss as evict failed
file_seg = file_cache.get(s3_fname);
ASSERT_EQ(file_seg, nullptr) << fmt::format("key={} size={} free_size={}", obj.key, obj.size, free_size);
LOG_INFO(log, "Evict failed as expected, key={} size={} free_size={}", obj.key, obj.size, free_size);
}
waitForBgDownload(file_cache);
}

auto meta_objects2 = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects2.size(), 2 * 2 * 2);
{
LOG_DEBUG(log, "Evict failed");
// Evict cached files
LOG_INFO(log, "Prepare for evict success case");
auto meta_objects = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects.size(), 2 * 2 * 2);

FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity);
UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used;
auto file_seg = file_cache.getAll(); // Prevent file_segment from evicted.
for (const auto & obj : meta_objects2)
LOG_INFO(log, "Running evict success cases, free_size={}", free_size);
for (const auto & obj : meta_objects)
{
auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key);
ASSERT_TRUE(s3_fname.isDataFile()) << obj.key;
Expand All @@ -336,20 +345,15 @@ try
{
waitForBgDownload(file_cache);
file_seg = file_cache.get(s3_fname);
if (free_size > obj.size)
{
free_size -= obj.size;
ASSERT_EQ(free_size, file_cache.cache_capacity - file_cache.cache_used);
ASSERT_NE(file_seg, nullptr) << obj.key;
ASSERT_TRUE(file_seg->isReadyToRead());
ASSERT_EQ(file_seg->getSize(), obj.size);
}
else
{
ASSERT_EQ(file_seg, nullptr) << obj.key;
}
}
ASSERT_NE(file_seg, nullptr) << obj.key;
ASSERT_TRUE(file_seg->isReadyToRead());
ASSERT_EQ(file_seg->getSize(), obj.size);
}
ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity);
free_size = file_cache.cache_capacity - file_cache.cache_used;
LOG_INFO(log, "After evict and cache new files, free_size={}", free_size);

waitForBgDownload(file_cache);
}
}
Expand Down