Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h" // @manual

#include <utility>

#ifdef VELOX_ENABLE_S3
#include "velox/common/base/StatsReporter.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h" // @manual
Expand Down Expand Up @@ -47,6 +49,7 @@ FileSystemMap& fileSystems() {
}

CacheKeyFn cacheKeyFunc;
S3FileSystemFactory fileSystemFactory;

std::shared_ptr<FileSystem> fileSystemGenerator(
std::shared_ptr<const config::ConfigBase> properties,
Expand Down Expand Up @@ -86,7 +89,10 @@ std::shared_ptr<FileSystem> fileSystemGenerator(
static_cast<std::optional<std::string>>(
properties->get<std::string>(S3Config::kS3LogLocation));
initializeS3(logLevel, logLocation);
auto fs = std::make_shared<S3FileSystem>(bucketName, properties);
auto fs = fileSystemFactory
? fileSystemFactory(std::move(bucketName), properties)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check fileSystemFactory is not null before calling it.

: std::make_shared<S3FileSystem>(bucketName, properties);
VELOX_CHECK_NOT_NULL(fs, "S3 file system factory returned nullptr");
instanceMap.insert({cacheKey, fs});
return fs;
Comment on lines 91 to 97
});
Expand All @@ -109,11 +115,14 @@ std::unique_ptr<velox::dwio::common::FileSink> s3WriteFileSinkGenerator(
}
#endif

void registerS3FileSystem(CacheKeyFn identityFunction) {
void registerS3FileSystem(
CacheKeyFn identityFunction,
S3FileSystemFactory fileSystemFactoryInput) {
#ifdef VELOX_ENABLE_S3
fileSystems().withWLock([&](auto& instanceMap) {
if (instanceMap.empty()) {
cacheKeyFunc = identityFunction;
fileSystemFactory = fileSystemFactoryInput;
registerFileSystem(isS3File, std::function(fileSystemGenerator));
dwio::common::FileSink::registerFactory(
std::function(s3WriteFileSinkGenerator));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <functional>
#include <memory>
#include <string>
#include <string_view>

namespace Aws::Auth {
// Forward-declare the AWSCredentialsProvider class from the AWS SDK.
Expand All @@ -31,11 +32,21 @@ class ConfigBase;

namespace facebook::velox::filesystems {

class FileSystem;

using CacheKeyFn = std::function<
std::string(std::shared_ptr<const config::ConfigBase>, std::string_view)>;

// Factory for substituting the FileSystem instance created for an S3 bucket.
// This customizes the filesystem object, not S3FileSystem::Impl.
using S3FileSystemFactory = std::function<std::shared_ptr<FileSystem>(
std::string bucketName,
std::shared_ptr<const config::ConfigBase> config)>;

// Register the S3 filesystem.
void registerS3FileSystem(CacheKeyFn cacheKeyFunc = nullptr);
void registerS3FileSystem(
CacheKeyFn cacheKeyFunc = nullptr,
S3FileSystemFactory fileSystemFactory = nullptr);

void registerS3Metrics();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,25 @@
return config->get<std::string>("hive.s3.endpoint").value();
}

class CustomS3FileSystem : public S3FileSystem {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the customization, do we intend to override S3FileSystem::Impl? If so, we may need to make the relevant members protected.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3FileSystem::Impl and impl_ are already protected in S3FileSystem.h, but the actual Impl definition lives in S3FileSystem.cpp, so it remains an internal implementation detail:
https://github.com/facebookincubator/velox/blob/main/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h#L90-L92
https://github.com/facebookincubator/velox/blob/main/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp#L230

The factory hook is intended to customize the FileSystem/S3FileSystem instance, not override Impl.

public:
CustomS3FileSystem(
std::string_view bucketName,
std::shared_ptr<const config::ConfigBase> config)
: S3FileSystem(bucketName, config) {}

Check warning on line 34 in velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

performance-unnecessary-value-param

parameter 'config' is passed by value and only copied once; consider moving it to avoid unnecessary copies
};

std::shared_ptr<FileSystem> s3FileSystemFactory(
std::string bucketName,

Check warning on line 38 in velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

performance-unnecessary-value-param

the parameter 'bucketName' is copied for each invocation but only used as a const reference; consider making it a const reference
std::shared_ptr<const config::ConfigBase> config) {

Check warning on line 39 in velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

performance-unnecessary-value-param

the parameter 'config' is copied for each invocation but only used as a const reference; consider making it a const reference
return std::make_shared<CustomS3FileSystem>(bucketName, config);
}

class S3FileSystemRegistrationTest : public S3Test {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
filesystems::registerS3FileSystem(cacheKeyFunc);
filesystems::registerS3FileSystem(cacheKeyFunc, s3FileSystemFactory);
}

static void TearDownTestCase() {
Expand Down Expand Up @@ -85,6 +99,13 @@
ASSERT_EQ(s3fs, s3fs_new);
}

TEST_F(S3FileSystemRegistrationTest, customFileSystemFactory) {
auto hiveConfig = minioServer_->hiveConfig();
auto s3fs = filesystems::getFileSystem(kDummyPath, hiveConfig);
auto customS3fs = std::dynamic_pointer_cast<CustomS3FileSystem>(s3fs);
VELOX_CHECK_NOT_NULL(customS3fs);
}

TEST_F(S3FileSystemRegistrationTest, finalize) {
auto hiveConfig = minioServer_->hiveConfig();
auto s3fs = filesystems::getFileSystem(kDummyPath, hiveConfig);
Expand Down
Loading