Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions dbms/src/Storages/S3/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <Common/TiFlashMetrics.h>
#include <Common/escapeForFileName.h>
#include <IO/BaseFile/PosixRandomAccessFile.h>
#include <IO/BaseFile/PosixWritableFile.h>
#include <IO/BaseFile/RateLimiter.h>
#include <IO/Buffer/ReadBufferFromIStream.h>
#include <IO/IOThreadPools.h>
#include <Interpreters/Settings.h>
#include <Server/StorageConfigParser.h>
Expand All @@ -30,11 +32,14 @@
#include <Storages/S3/FileCache.h>
#include <Storages/S3/FileCachePerf.h>
#include <Storages/S3/S3Common.h>
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <common/logger_useful.h>
#include <fcntl.h>
#include <fmt/chrono.h>

#include <atomic>
#include <cerrno>
#include <chrono>
#include <cmath>
#include <filesystem>
Expand Down Expand Up @@ -980,6 +985,43 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size,
return true;
}

size_t downloadToLocal(
Aws::IOStream & istr,
const String & fname,
Int64 content_length,
const WriteLimiterPtr & write_limiter)
{
// create an empty file with write_limiter
// each time `ofile.write` is called, the write speed will be controlled by the write_limiter.
PosixWritableFile ofile(fname, true, O_CREAT | O_WRONLY, 0666, write_limiter);
// simply create an empty file
if (unlikely(content_length <= 0))
return 0;

GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length);
size_t total_written = 0;
static const Int64 MAX_BUFFER_SIZE = 16 * 1024; // 16k
ReadBufferFromIStream rbuf(istr, std::min(content_length, MAX_BUFFER_SIZE));
while (!rbuf.eof())
{
size_t count = rbuf.buffer().end() - rbuf.position();
if (ssize_t write_res = ofile.write(rbuf.position(), count); write_res < 0)
{
throwFromErrno(fmt::format("write to file failed, fname={}", fname), write_res, errno);
}
else
{
total_written += write_res;
}
rbuf.position() += count;
}
if (auto res = ofile.fsync(); res < 0)
{
throwFromErrno(fmt::format("fsync file failed, fname={}", fname), res, errno);
}
return total_written;
}

void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter)
{
Stopwatch sw;
Expand Down Expand Up @@ -1007,28 +1049,11 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c

const auto & local_fname = file_seg->getLocalFileName();
prepareParentDir(local_fname);
// download as a temp file then rename to a formal file
auto temp_fname = toTemporaryFilename(local_fname);
{
Aws::OFStream ostr(temp_fname, std::ios_base::out | std::ios_base::binary);
RUNTIME_CHECK_MSG(ostr.is_open(), "Open {} failed: {}", temp_fname, strerror(errno));
if (content_length > 0)
{
if (write_limiter)
write_limiter->request(content_length);
GET_METRIC(tiflash_storage_remote_cache_bytes, type_dtfile_download_bytes).Increment(content_length);
ostr << result.GetBody().rdbuf();
// If content_length == 0, ostr.good() is false. Does not know the reason.
RUNTIME_CHECK_MSG(
ostr.good(),
"Write {} content_length {} failed: {}",
temp_fname,
content_length,
strerror(errno));
ostr.flush();
}
}
size_t fsize = downloadToLocal(result.GetBody(), temp_fname, content_length, write_limiter);
std::filesystem::rename(temp_fname, local_fname);
auto fsize = std::filesystem::file_size(local_fname);

capacity_metrics->addUsedSize(local_fname, fsize);
RUNTIME_CHECK_MSG(
fsize == static_cast<UInt64>(content_length),
Expand All @@ -1037,7 +1062,7 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c
fsize,
content_length);
file_seg->setStatus(FileSegment::Status::Complete);
LOG_DEBUG(
LOG_INFO(
log,
"Download s3_key={} to local={} size={} cost={}ms",
s3_key,
Expand Down