Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bolt/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,9 @@ void AsyncDataCache::shutdown() {
for (auto& shard : shards_) {
shard->shutdown();
}
if (ssdCache_) {
ssdCache_->shutdown();
}
}

void CacheShard::shutdown() {
Expand Down
63 changes: 40 additions & 23 deletions bolt/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ SsdCache::SsdCache(
"Ssd path '{}' does not start with '/' that points to local file system.",
filePrefix_);
filesystems::getFileSystem(filePrefix_, nullptr)
->mkdir(std::filesystem::path(filePrefix).parent_path().string());
->mkdir(std::filesystem::path(filePrefix_).parent_path().string());

files_.reserve(numShards_);
// Cache size must be a multiple of this so that each shard has the same max
Expand All @@ -74,7 +74,8 @@ SsdCache::SsdCache(
i,
fileMaxRegions,
checkpointIntervalBytes / numShards,
disableFileCow));
disableFileCow,
executor_));
}
}

Expand All @@ -84,20 +85,19 @@ SsdFile& SsdCache::file(uint64_t fileId) {
}

bool SsdCache::startWrite() {
if (isShutdown_) {
return false;
}
if (writesInProgress_.fetch_add(numShards_) == 0) {
std::lock_guard<std::mutex> l(mutex_);
checkNotShutdownLocked();
if (writesInProgress_ == 0) {
// No write was pending, so now all shards are counted as writing.
writesInProgress_ += numShards_;
return true;
}
// There were writes in progress, so compensate for the increment.
writesInProgress_.fetch_sub(numShards_);
BOLT_CHECK_GE(writesInProgress_, 0);
return false;
}

void SsdCache::write(std::vector<CachePin> pins) {
BOLT_CHECK_LE(numShards_, writesInProgress_);
BOLT_CHECK_EQ(numShards_, writesInProgress_);

BOLT_TEST_ADJUST("bytedance::bolt::cache::SsdCache::write", this);

Expand Down Expand Up @@ -163,12 +163,11 @@ bool SsdCache::removeFileEntries(
success &= files_[i]->removeFileEntries(filesToRemove, filesRetained);
} catch (const std::exception& e) {
BOLT_SSD_CACHE_LOG(ERROR) << "Error removing file entries from SSD shard "
<< files_[i]->shardId() << ": " << e.what();
<< files_[i]->shardId() << ": " << e.what();
success = false;
}
--writesInProgress_;
}

return success;
}

Expand All @@ -180,12 +179,6 @@ SsdCacheStats SsdCache::stats() const {
return stats;
}

void SsdCache::clear() {
for (auto& file : files_) {
file->clear();
}
}

std::string SsdCache::toString() const {
auto data = stats();
uint64_t capacity = maxBytes();
Expand All @@ -198,20 +191,44 @@ std::string SsdCache::toString() const {
return out.str();
}

void SsdCache::testingDeleteFiles() {
for (auto& file : files_) {
file->deleteFile();
void SsdCache::shutdown() {
{
std::lock_guard<std::mutex> l(mutex_);
if (shutdown_) {
BOLT_SSD_CACHE_LOG(INFO) << "SSD cache has already been shutdown";
}
shutdown_ = true;
}
}

void SsdCache::shutdown() {
isShutdown_ = true;
BOLT_SSD_CACHE_LOG(INFO) << "SSD cache is shutting down";
while (writesInProgress_) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
}
for (auto& file : files_) {
file->checkpoint(true);
}
BOLT_SSD_CACHE_LOG(INFO) << "SSD cache has been shutdown";
}

void SsdCache::testingClear() {
for (auto& file : files_) {
file->testingClear();
}
}

void SsdCache::testingDeleteFiles() {
for (auto& file : files_) {
file->testingDeleteFile();
}
}

uint64_t SsdCache::testingTotalLogEvictionFilesSize() {
uint64_t size = 0;
for (auto& file : files_) {
std::filesystem::path p{file->getEvictLogFilePath()};
size += std::filesystem::file_size(p);
}
return size;
}

} // namespace bytedance::bolt::cache
26 changes: 20 additions & 6 deletions bolt/common/caching/SsdCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,29 +106,43 @@ class SsdCache {
/// Drops all entries. Outstanding pins become invalid but reading them will
/// mostly succeed since the files will not be rewritten until new content is
/// stored.
void clear();
void testingClear();

/// Deletes backing files. Used in testing.
void testingDeleteFiles();

/// Returns the total size of eviction log files. Used in testing.
uint64_t testingTotalLogEvictionFilesSize();

/// Stops writing to the cache files and waits for pending writes to finish.
/// If checkpointing is on, makes a checkpoint.
void shutdown();

std::string toString() const;

const std::string& filePrefix() const {
return filePrefix_;
}

private:
void checkNotShutdownLocked() {
BOLT_CHECK(
!shutdown_, "Unexpected write after SSD cache has been shutdown");
}

const std::string filePrefix_;
const int32_t numShards_;

// Stats for selecting entries to save from AsyncDataCache.
const std::unique_ptr<FileGroupStats> groupStats_;
folly::Executor* const executor_;
mutable std::mutex mutex_;

std::vector<std::unique_ptr<SsdFile>> files_;

// Count of shards with unfinished writes.
std::atomic<int32_t> writesInProgress_{0};

// Stats for selecting entries to save from AsyncDataCache.
std::unique_ptr<FileGroupStats> groupStats_;
folly::Executor* executor_;
std::atomic<bool> isShutdown_{false};
bool shutdown_{false};
};

} // namespace bytedance::bolt::cache
Loading
Loading