Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8891][VL] Allow to reuse local SSD cache on Spark context restart #8892

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ object VeloxConfig {
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_SSD_REUSE =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdReuse")
.internal()
.doc("The flag for cache resue")
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_CONNECTOR_IO_THREADS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads")
.internal()
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ void VeloxBackend::initCache() {
int32_t ssdCacheIOThreads = backendConf_->get<int32_t>(kVeloxSsdCacheIOThreads, kVeloxSsdCacheIOThreadsDefault);
std::string ssdCachePathPrefix = backendConf_->get<std::string>(kVeloxSsdCachePath, kVeloxSsdCachePathDefault);

ssdReuse_ = backendConf_->get<bool>(kVeloxSsdResue, false);
cachePathPrefix_ = ssdCachePathPrefix;
cacheFilePrefix_ = getCacheFilePrefix();
std::string ssdCachePath = ssdCachePathPrefix + "/" + cacheFilePrefix_;
Expand Down
14 changes: 10 additions & 4 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ class VeloxBackend {
~VeloxBackend() {
if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) {
LOG(INFO) << asyncDataCache_->toString();
for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) {
if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) {
LOG(INFO) << "Removing cache file " << entry.path().filename().string();
std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string());
if (!ssdReuse_) {
for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) {
if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) {
LOG(INFO) << "Removing cache file " << entry.path().filename().string();
std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string());
}
}
}
asyncDataCache_->shutdown();
Expand Down Expand Up @@ -79,6 +81,9 @@ class VeloxBackend {
void initJolFilesystem();

std::string getCacheFilePrefix() {
if (ssdReuse_) {
return "cache.";
}
return "cache." + boost::lexical_cast<std::string>(boost::uuids::random_generator()()) + ".";
}

Expand All @@ -91,6 +96,7 @@ class VeloxBackend {
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_;

bool ssdReuse_;
std::string cachePathPrefix_;
std::string cacheFilePrefix_;

Expand Down
1 change: 1 addition & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ const uint32_t kVeloxSsdCacheShardsDefault = 1;
const std::string kVeloxSsdCacheIOThreads = "spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads";
const uint32_t kVeloxSsdCacheIOThreadsDefault = 1;
const std::string kVeloxSsdODirectEnabled = "spark.gluten.sql.columnar.backend.velox.ssdODirect";
const std::string kVeloxSsdResue = "spark.gluten.sql.columnar.backend.velox.ssdReuse";

// async
const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads";
Expand Down
Loading