Skip to content

Commit

Permalink
[Cpp]: Add more fs support
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 11, 2024
1 parent 641e4fd commit d0551b7
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "result.h"
namespace milvus_storage {

Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path = nullptr);
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path = nullptr);

std::string UriToPath(const std::string& uri);

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/common/opendal_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class OpendalFileSystem : public arrow::fs::FileSystem {
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata = {}) override;

/// Create a S3FileSystem instance from the given options.
static arrow::Result<std::unique_ptr<OpendalFileSystem>> Make(
static arrow::Result<std::shared_ptr<OpendalFileSystem>> Make(
const OpendalOptions& options, const arrow::io::IOContext& = arrow::io::default_io_context());

protected:
Expand Down
58 changes: 34 additions & 24 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,56 @@
#include <arrow/filesystem/localfs.h>
#include <arrow/filesystem/hdfs.h>
#include <arrow/filesystem/s3fs.h>
#include "arrow/filesystem/gcsfs.h"
#include <arrow/filesystem/type_fwd.h>
#include <arrow/util/uri.h>
#include <cstdlib>
#include <memory>
#include "common/log.h"
#include "common/macro.h"
#include "common/opendal_fs.h"
#include "arrow/filesystem/azurefs.h"

namespace milvus_storage {

Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
arrow::internal::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
auto scheme = uri_parser.scheme();

if (scheme == "file") {
if (out_path == nullptr) {
return Status::InvalidArgument("out_path should not be nullptr if scheme is file");
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::LocalFileSystemOptions::FromUri(uri_parser, out_path));
return std::unique_ptr<arrow::fs::FileSystem>(new arrow::fs::LocalFileSystem(option));
return std::shared_ptr<arrow::fs::FileSystem>(new arrow::fs::LocalFileSystem(option));
} else if (scheme == "opendal") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option));
return std::dynamic_pointer_cast<arrow::fs::FileSystem>(fs);
} else if (scheme == "gs") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::GcsOptions::FromUri(uri_parser, out_path));
auto fs = arrow::fs::GcsFileSystem::Make(option);
return std::dynamic_pointer_cast<arrow::fs::FileSystem>(fs);
} else if (scheme == "azure") {
// FIXME: arrow does not support to create azurefs from uri for now
arrow::fs::AzureOptions options;
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options));
return std::dynamic_pointer_cast<arrow::fs::FileSystem>(fs);
} else if (scheme == "s3") {
if (!arrow::fs::IsS3Initialized()) {
RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized());
std::atexit([]() {
auto status = arrow::fs::EnsureS3Finalized();
if (!status.ok()) {
LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message();
}
});
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option));

return std::shared_ptr<arrow::fs::FileSystem>(fs);
}

// if (schema == "hdfs") {
Expand All @@ -42,28 +74,6 @@ Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }

if (scheme == "opendal") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option));
return std::unique_ptr<arrow::fs::FileSystem>(std::move(fs));
}

// if (schema == "s3") {
// if (!arrow::fs::IsS3Initialized()) {
// RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized());
// std::atexit([]() {
// auto status = arrow::fs::EnsureS3Finalized();
// if (!status.ok()) {
// LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message();
// }
// });
// }
// ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path));
// ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option));
//
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }
//
return Status::InvalidArgument("Unsupported schema: " + scheme);
}
/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/common/opendal_fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ arrow::Result<std::shared_ptr<OpendalInputFile>> read(arrow::io::IOContext& io_c

class OpendalOutputStream : public arrow::io::OutputStream {};

arrow::Result<std::unique_ptr<OpendalFileSystem>> OpendalFileSystem::Make(const OpendalOptions& options,
arrow::Result<std::shared_ptr<OpendalFileSystem>> OpendalFileSystem::Make(const OpendalOptions& options,
const arrow::io::IOContext& ctx) {
auto fs = std::unique_ptr<OpendalFileSystem>(new OpendalFileSystem(options, ctx));
auto fs = std::shared_ptr<OpendalFileSystem>(new OpendalFileSystem(options, ctx));
opendal_operator_options* op_options_ = opendal_operator_options_new();
for (auto& option : options.options()) {
opendal_operator_options_set(op_options_, option.first.c_str(), option.second.c_str());
Expand Down

0 comments on commit d0551b7

Please sign in to comment.