Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 46 additions & 25 deletions dbms/src/Storages/S3/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@
#include <Common/TiFlashMetrics.h>
#include <Common/escapeForFileName.h>
#include <IO/BaseFile/PosixRandomAccessFile.h>
#include <IO/BaseFile/PosixWritableFile.h>
#include <IO/BaseFile/RateLimiter.h>
#include <IO/Buffer/ReadBufferFromIStream.h>
#include <IO/Buffer/WriteBufferFromWritableFile.h>
#include <IO/IOThreadPools.h>
#include <IO/copyData.h>
#include <Interpreters/Settings.h>
#include <Server/StorageConfigParser.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/S3/FileCache.h>
#include <Storages/S3/FileCachePerf.h>
#include <Storages/S3/S3Common.h>
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <common/logger_useful.h>
#include <fcntl.h>
#include <fmt/chrono.h>

#include <atomic>
Expand Down Expand Up @@ -980,6 +986,27 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size,
return true;
}

void downloadToLocal(
Aws::IOStream & istr,
const String & fname,
Int64 content_length,
const WriteLimiterPtr & write_limiter)
{
// create an empty file with write_limiter
// each time `ofile.write` is called, the write speed will be controlled by the write_limiter.
auto ofile = std::make_shared<PosixWritableFile>(fname, true, O_CREAT | O_WRONLY, 0666, write_limiter);
// simply create an empty file
if (unlikely(content_length <= 0))
return;

GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length);
static const Int64 MAX_BUFFER_SIZE = 128 * 1024; // 128k
ReadBufferFromIStream rbuf(istr, std::min(content_length, MAX_BUFFER_SIZE));
WriteBufferFromWritableFile wbuf(ofile, std::min(content_length, MAX_BUFFER_SIZE));
copyData(rbuf, wbuf, content_length);
wbuf.sync();
}

void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter)
{
Stopwatch sw;
Expand All @@ -1000,46 +1027,40 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c
SYNC_FOR("before_FileCache::downloadImpl_reserve_size");
if (!finalizeReservedSize(file_seg->getFileType(), file_seg->getSize(), content_length))
{
LOG_DEBUG(log, "s3_key={} finalizeReservedSize {}=>{} failed.", s3_key, file_seg->getSize(), content_length);
LOG_INFO(
log,
"Download finalizeReservedSize failed, s3_key={} seg_size={} size={}",
s3_key,
file_seg->getSize(),
content_length);
file_seg->setStatus(FileSegment::Status::Failed);
return;
}
file_seg->setSize(content_length);

const auto & local_fname = file_seg->getLocalFileName();
// download as a temp file then rename to a formal file
prepareParentDir(local_fname);
auto temp_fname = toTemporaryFilename(local_fname);
{
Aws::OFStream ostr(temp_fname, std::ios_base::out | std::ios_base::binary);
RUNTIME_CHECK_MSG(ostr.is_open(), "Open {} failed: {}", temp_fname, strerror(errno));
if (content_length > 0)
{
if (write_limiter)
write_limiter->request(content_length);
GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length);
ostr << result.GetBody().rdbuf();
// If content_length == 0, ostr.good() is false. Does not know the reason.
RUNTIME_CHECK_MSG(
ostr.good(),
"Write {} content_length {} failed: {}",
temp_fname,
content_length,
strerror(errno));
ostr.flush();
}
}
downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter);
std::filesystem::rename(temp_fname, local_fname);

#ifndef NDEBUG
// sanity check under debug mode
auto fsize = std::filesystem::file_size(local_fname);
capacity_metrics->addUsedSize(local_fname, fsize);
RUNTIME_CHECK_MSG(
fsize == static_cast<UInt64>(content_length),
"local_fname={}, file_size={}, content_length={}",
local_fname,
fsize,
content_length);
file_seg->setStatus(FileSegment::Status::Complete);
LOG_DEBUG(
#endif

capacity_metrics->addUsedSize(local_fname, content_length);
// update the file segment size and set as complete
file_seg->setComplete(content_length);
LOG_INFO(
log,
"Download s3_key={} to local={} size={} cost={}ms",
"Download success, s3_key={} local={} size={} cost={}ms",
s3_key,
local_fname,
content_length,
Expand Down
16 changes: 9 additions & 7 deletions dbms/src/Storages/S3/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ class FileSegment

Status waitForNotEmpty();

void setSize(UInt64 size_)
void setComplete(UInt64 size_)
{
std::lock_guard lock(mtx);
size = size_;
status = FileSegment::Status::Complete;
cv_ready.notify_all();
}

void setStatus(Status s)
Expand All @@ -103,6 +105,12 @@ class FileSegment
cv_ready.notify_all();
}

Status getStatus() const
{
std::lock_guard lock(mtx);
return status;
}

UInt64 getSize() const
{
std::lock_guard lock(mtx);
Expand Down Expand Up @@ -133,12 +141,6 @@ class FileSegment
return (std::chrono::system_clock::now() - last_access_time) < sec;
}

Status getStatus() const
{
std::lock_guard lock(mtx);
return status;
}

auto getLastAccessTime() const
{
std::unique_lock lock(mtx);
Expand Down
78 changes: 41 additions & 37 deletions dbms/src/Storages/S3/tests/gtest_filecache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class FileCacheTest : public ::testing::Test
}
auto r = file.fsync();
ASSERT_EQ(r, 0);
LOG_DEBUG(log, "write fname={} size={} done, cost={}s", key, size, sw.elapsedSeconds());
LOG_DEBUG(log, "write fname={} size={} done, cost={:.3f}s", key, size, sw.elapsedSeconds());
}

void writeS3FileWithSize(const S3Filename & s3_dir, std::string_view file_name, size_t size)
Expand Down Expand Up @@ -199,14 +199,15 @@ class FileCacheTest : public ::testing::Test
{
std::this_thread::sleep_for(1000ms);
}
LOG_DEBUG(
LOG_INFO(
log,
"Download summary: succ={} fail={} cost={}s",
"Download summary: succ={} fail={} cost={:.3f}s",
file_cache.bg_download_succ_count.load(std::memory_order_relaxed),
file_cache.bg_download_fail_count.load(std::memory_order_relaxed),
sw.elapsedSeconds());
}

// Update the config.capacity to make sure dtfile cache capacity equals to `dt_size`.
static void calculateCacheCapacity(StorageRemoteCacheConfig & config, UInt64 dt_size)
{
config.capacity = dt_size / (1.0 - config.delta_rate);
Expand Down Expand Up @@ -248,18 +249,19 @@ try
Stopwatch sw;
auto objects = genObjects(/*store_count*/ 1, /*table_count*/ 1, /*file_count*/ 1, basenames);
auto total_size = objectsTotalSize(objects);
LOG_DEBUG(log, "genObjects: count={} total_size={} cost={}s", objects.size(), total_size, sw.elapsedSeconds());
LOG_INFO(log, "genObjects: count={} total_size={} cost={:.3f}s", objects.size(), total_size, sw.elapsedSeconds());

auto cache_dir = fmt::format("{}/file_cache_all", tmp_dir);
StorageRemoteCacheConfig cache_config{.dir = cache_dir, .dtfile_level = 100};
calculateCacheCapacity(cache_config, total_size);
LOG_DEBUG(log, "total_size={} dt_cache_capacity={}", total_size, cache_config.getDTFileCapacity());
LOG_INFO(log, "total_size={} dt_cache_capacity={}", total_size, cache_config.getDTFileCapacity());

UInt16 vcores = 4;
IORateLimiter rate_limiter;

{
LOG_DEBUG(log, "Cache all data");
// download all files to local filesystem as cache
LOG_INFO(log, "Cache all data");
FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
for (const auto & obj : objects)
{
Expand All @@ -283,7 +285,8 @@ try
}

{
LOG_DEBUG(log, "Cache restore");
// restore cache from local filesystem after process restart
LOG_INFO(log, "Cache restore");
FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
ASSERT_EQ(file_cache.cache_used, file_cache.cache_capacity);
for (const auto & obj : objects)
Expand All @@ -297,37 +300,43 @@ try
}
}

auto meta_objects = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects.size(), 2 * 2 * 2);
{
LOG_DEBUG(log, "Evict success");
LOG_INFO(log, "Prepare for evict fail case");
auto meta_objects2 = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects2.size(), 2 * 2 * 2);

FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity);
for (const auto & obj : meta_objects)
UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used;
LOG_INFO(log, "Running evict failed cases, free_size={}", free_size);
// Keep the file_seg ptrs to mock reading in progress, it should prevent file_segment from being evicted.
auto all_file_segs = file_cache.getAll();
for (const auto & obj : meta_objects2)
{
auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key);
ASSERT_TRUE(s3_fname.isDataFile()) << obj.key;
// cache miss and try init background download
auto file_seg = file_cache.get(s3_fname);
if (file_seg == nullptr)
{
waitForBgDownload(file_cache);
file_seg = file_cache.get(s3_fname);
}
ASSERT_NE(file_seg, nullptr) << obj.key;
ASSERT_TRUE(file_seg->isReadyToRead());
ASSERT_EQ(file_seg->getSize(), obj.size);
ASSERT_EQ(file_seg, nullptr) << obj.key;
waitForBgDownload(file_cache);
// after bg download finished, try get again, should still miss as evict failed
file_seg = file_cache.get(s3_fname);
ASSERT_EQ(file_seg, nullptr) << fmt::format("key={} size={} free_size={}", obj.key, obj.size, free_size);
LOG_INFO(log, "Evict failed as expected, key={} size={} free_size={}", obj.key, obj.size, free_size);
}
waitForBgDownload(file_cache);
}

auto meta_objects2 = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects2.size(), 2 * 2 * 2);
{
LOG_DEBUG(log, "Evict failed");
// Evict cached files
LOG_INFO(log, "Prepare for evict success case");
auto meta_objects = genObjects(/*store_count*/ 2, /*table_count*/ 2, /*file_count*/ 2, {"meta"});
ASSERT_EQ(meta_objects.size(), 2 * 2 * 2);

FileCache file_cache(capacity_metrics, cache_config, vcores, rate_limiter);
ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity);
UInt64 free_size = file_cache.cache_capacity - file_cache.cache_used;
auto file_seg = file_cache.getAll(); // Prevent file_segment from evicted.
for (const auto & obj : meta_objects2)
LOG_INFO(log, "Running evict success cases, free_size={}", free_size);
for (const auto & obj : meta_objects)
{
auto s3_fname = ::DB::S3::S3FilenameView::fromKey(obj.key);
ASSERT_TRUE(s3_fname.isDataFile()) << obj.key;
Expand All @@ -336,20 +345,15 @@ try
{
waitForBgDownload(file_cache);
file_seg = file_cache.get(s3_fname);
if (free_size > obj.size)
{
free_size -= obj.size;
ASSERT_EQ(free_size, file_cache.cache_capacity - file_cache.cache_used);
ASSERT_NE(file_seg, nullptr) << obj.key;
ASSERT_TRUE(file_seg->isReadyToRead());
ASSERT_EQ(file_seg->getSize(), obj.size);
}
else
{
ASSERT_EQ(file_seg, nullptr) << obj.key;
}
}
ASSERT_NE(file_seg, nullptr) << obj.key;
ASSERT_TRUE(file_seg->isReadyToRead());
ASSERT_EQ(file_seg->getSize(), obj.size);
}
ASSERT_LE(file_cache.cache_used, file_cache.cache_capacity);
free_size = file_cache.cache_capacity - file_cache.cache_used;
LOG_INFO(log, "After evict and cache new files, free_size={}", free_size);

waitForBgDownload(file_cache);
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/docker/next-gen-utils/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
binaries
master_key