diff --git a/cpp/include/milvus-storage/common/fs_util.h b/cpp/include/milvus-storage/common/fs_util.h index 67be4288..7c256c92 100644 --- a/cpp/include/milvus-storage/common/fs_util.h +++ b/cpp/include/milvus-storage/common/fs_util.h @@ -18,7 +18,7 @@ #include "result.h" namespace milvus_storage { -Result> BuildFileSystem(const std::string& uri, std::string* out_path = nullptr); +Result> BuildFileSystem(const std::string& uri, std::string* out_path = nullptr); std::string UriToPath(const std::string& uri); diff --git a/cpp/include/milvus-storage/common/opendal_fs.h b/cpp/include/milvus-storage/common/opendal_fs.h index 85a4262e..506f1165 100644 --- a/cpp/include/milvus-storage/common/opendal_fs.h +++ b/cpp/include/milvus-storage/common/opendal_fs.h @@ -67,7 +67,7 @@ class OpendalFileSystem : public arrow::fs::FileSystem { const std::string& path, const std::shared_ptr& metadata = {}) override; /// Create a S3FileSystem instance from the given options. - static arrow::Result> Make( + static arrow::Result> Make( const OpendalOptions& options, const arrow::io::IOContext& = arrow::io::default_io_context()); protected: diff --git a/cpp/src/common/fs_util.cpp b/cpp/src/common/fs_util.cpp index 537cf688..2fda6490 100644 --- a/cpp/src/common/fs_util.cpp +++ b/cpp/src/common/fs_util.cpp @@ -16,24 +16,56 @@ #include #include #include +#include "arrow/filesystem/gcsfs.h" +#include #include #include +#include #include "common/log.h" #include "common/macro.h" #include "common/opendal_fs.h" +#include "arrow/filesystem/azurefs.h" namespace milvus_storage { -Result> BuildFileSystem(const std::string& uri, std::string* out_path) { +Result> BuildFileSystem(const std::string& uri, std::string* out_path) { arrow::internal::Uri uri_parser; RETURN_ARROW_NOT_OK(uri_parser.Parse(uri)); auto scheme = uri_parser.scheme(); + if (scheme == "file") { if (out_path == nullptr) { return Status::InvalidArgument("out_path should not be nullptr if scheme is file"); } ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::LocalFileSystemOptions::FromUri(uri_parser, out_path)); - return std::unique_ptr(new arrow::fs::LocalFileSystem(option)); + return std::shared_ptr(new arrow::fs::LocalFileSystem(option)); + } else if (scheme == "opendal") { + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option)); + return std::dynamic_pointer_cast(fs); + } else if (scheme == "gs") { + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::GcsOptions::FromUri(uri_parser, out_path)); + auto fs = arrow::fs::GcsFileSystem::Make(option); + return std::dynamic_pointer_cast(fs); + } else if (scheme == "azure") { + // FIXME: arrow does not support to create azurefs from uri for now + arrow::fs::AzureOptions options; + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options)); + return std::dynamic_pointer_cast(fs); + } else if (scheme == "s3") { + if (!arrow::fs::IsS3Initialized()) { + RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized()); + std::atexit([]() { + auto status = arrow::fs::EnsureS3Finalized(); + if (!status.ok()) { + LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message(); + } + }); + } + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option)); + + return std::shared_ptr(fs); } // if (schema == "hdfs") { @@ -42,28 +74,6 @@ Result> BuildFileSystem(const std::string // return std::shared_ptr(fs); // } - if (scheme == "opendal") { - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path)); - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option)); - return std::unique_ptr(std::move(fs)); - } - - // if (schema == "s3") { - // if (!arrow::fs::IsS3Initialized()) { - // RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized()); - // std::atexit([]() { - // auto status = arrow::fs::EnsureS3Finalized(); - // if (!status.ok()) { - // LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message(); - // } - // }); - // } - // ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path)); - // ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option)); - // - // return std::shared_ptr(fs); - // } - // return Status::InvalidArgument("Unsupported schema: " + scheme); } /** diff --git a/cpp/src/common/opendal_fs.cpp b/cpp/src/common/opendal_fs.cpp index 7a1ee416..5f14e7e1 100644 --- a/cpp/src/common/opendal_fs.cpp +++ b/cpp/src/common/opendal_fs.cpp @@ -166,9 +166,9 @@ arrow::Result> read(arrow::io::IOContext& io_c class OpendalOutputStream : public arrow::io::OutputStream {}; -arrow::Result> OpendalFileSystem::Make(const OpendalOptions& options, +arrow::Result> OpendalFileSystem::Make(const OpendalOptions& options, const arrow::io::IOContext& ctx) { - auto fs = std::unique_ptr(new OpendalFileSystem(options, ctx)); + auto fs = std::shared_ptr(new OpendalFileSystem(options, ctx)); opendal_operator_options* op_options_ = opendal_operator_options_new(); for (auto& option : options.options()) { opendal_operator_options_set(op_options_, option.first.c_str(), option.second.c_str());