From a6a00c8066843e11eeff231055f3cbb058d90148 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Wed, 15 Nov 2023 10:30:12 +0800 Subject: [PATCH] Add opendal fs implement (#91) --- cpp/CMakeLists.txt | 15 +- cpp/cmake/libopendal.cmake | 55 ++++ cpp/conanfile.py | 8 +- .../milvus-storage/common/opendal_fs.h | 65 ++++ cpp/src/common/fs_util.cpp | 36 ++- cpp/src/common/opendal_fs.cpp | 301 ++++++++++++++++++ 6 files changed, 458 insertions(+), 22 deletions(-) create mode 100644 cpp/cmake/libopendal.cmake create mode 100644 cpp/include/milvus-storage/common/opendal_fs.h create mode 100644 cpp/src/common/opendal_fs.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f7bfddd..cebb9a6 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -8,18 +8,25 @@ option(WITH_ASAN "Build with address sanitizer." OFF) set(CMAKE_CXX_STANDARD 17) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") +include(libopendal) + find_package(Boost REQUIRED) find_package(Arrow REQUIRED) find_package(protobuf REQUIRED) find_package(glog REQUIRED) -find_package(AWSSDK REQUIRED) file(GLOB_RECURSE SRC_FILES src/*.cpp src/*.cc) -message(STATUS "SRC_FILES: ${SRC_FILES}") add_library(milvus-storage ${SRC_FILES}) target_include_directories(milvus-storage PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include/milvus-storage ${CMAKE_CURRENT_SOURCE_DIR}/src) -target_link_libraries(milvus-storage PUBLIC arrow::libarrow arrow::libparquet Boost::boost protobuf::protobuf aws-sdk-cpp::aws-sdk-cpp) -target_link_libraries(milvus-storage PUBLIC glog::glog) +target_link_libraries(milvus-storage PUBLIC + arrow::libarrow + arrow::libparquet + Boost::boost + protobuf::protobuf + glog::glog + opendal +) if (WITH_UT) enable_testing() diff --git a/cpp/cmake/libopendal.cmake b/cpp/cmake/libopendal.cmake new file mode 100644 index 0000000..2040918 --- /dev/null +++ b/cpp/cmake/libopendal.cmake @@ -0,0 +1,55 @@ +function(build_opendal) + include(ExternalProject) + set(OPENDAL_NAME "libopendal_c${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(OPENDAL_PREFIX ${CMAKE_BINARY_DIR}/thirdparty/opendal_ep) + + file(MAKE_DIRECTORY + "${OPENDAL_PREFIX}" + "${OPENDAL_PREFIX}/lib" + "${OPENDAL_PREFIX}/include" + ) + + if (CMAKE_BUILD_TYPE STREQUAL "Debug") + set(OPENDAL_BUILD_TYPE "debug") + else() + set(OPENDAL_BUILD_TYPE "release") + endif() + + ExternalProject_Add( + opendal_ep + GIT_REPOSITORY https://github.com/apache/incubator-opendal.git + GIT_TAG main + PREFIX ${OPENDAL_PREFIX} + SOURCE_SUBDIR bindings/c + CONFIGURE_COMMAND echo "configure for opendal_ep" + BUILD_COMMAND cargo build --${OPENDAL_BUILD_TYPE} + BUILD_IN_SOURCE 1 + INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/") + + + add_library(opendal STATIC IMPORTED) + set_target_properties(opendal + PROPERTIES + IMPORTED_GLOBAL TRUE + IMPORTED_LOCATION "${OPENDAL_PREFIX}/lib/${OPENDAL_NAME}" + INTERFACE_INCLUDE_DIRECTORIES "${OPENDAL_PREFIX}/include") + add_dependencies(opendal opendal_ep) + if(APPLE) + target_link_libraries(opendal INTERFACE "-framework CoreFoundation") + target_link_libraries(opendal INTERFACE "-framework Security") + target_link_libraries(opendal INTERFACE "-framework SystemConfiguration") + endif() + + get_target_property(OPENDAL_IMPORTED_LOCATION opendal IMPORTED_LOCATION) + get_target_property(OPENDAL_INTERFACE_INCLUDE_DIRECTORIES opendal INTERFACE_INCLUDE_DIRECTORIES) + message("OPENDAL_IMPORTED_LOCATION: ${OPENDAL_IMPORTED_LOCATION}") + message("OPENDAL_INTERFACE_INCLUDE_DIRECTORIES: ${OPENDAL_INTERFACE_INCLUDE_DIRECTORIES}") +endfunction() + +if (opendal_FOUND) + message("Found opendal: ${opendal_INCLUDE_DIRS}") +else() + build_opendal() +endif() + + diff --git a/cpp/conanfile.py b/cpp/conanfile.py index fe0090c..9fb8391 100644 --- a/cpp/conanfile.py +++ b/cpp/conanfile.py @@ -33,10 +33,10 @@ class StorageConan(ConanFile): "with_asan": False, "with_profiler": False, "with_ut": True, - "arrow:with_s3": True, - "aws-sdk-cpp:config": True, - "aws-sdk-cpp:text-to-speech": False, - "aws-sdk-cpp:transfer": False, + # "arrow:with_s3": True, + # "aws-sdk-cpp:config": True, + # "aws-sdk-cpp:text-to-speech": False, + # "aws-sdk-cpp:transfer": False, "arrow:filesystem_layer": True, "arrow:dataset_modules": True, "arrow:parquet": True, diff --git a/cpp/include/milvus-storage/common/opendal_fs.h b/cpp/include/milvus-storage/common/opendal_fs.h new file mode 100644 index 0000000..d823f10 --- /dev/null +++ b/cpp/include/milvus-storage/common/opendal_fs.h @@ -0,0 +1,65 @@ +#include +#include +#include +#include "opendal.h" + +namespace milvus_storage { + +class OpendalOptions { + public: + static arrow::Result FromUri(const arrow::internal::Uri& uri, std::string* out_path); + + const std::unordered_map& options() const { return options_; } + + const std::string& at(const std::string& key) const { return options_.at(key); } + + protected: + std::unordered_map options_; +}; + +class OpendalFileSystem : public arrow::fs::FileSystem { + public: + ~OpendalFileSystem() override; + + std::string type_name() const override { return "opendal"; } + + bool Equals(const FileSystem& other) const override; + + arrow::Result GetFileInfo(const std::string& path) override; + arrow::Result> GetFileInfo(const arrow::fs::FileSelector& select) override; + arrow::fs::FileInfoGenerator GetFileInfoGenerator(const arrow::fs::FileSelector& select) override { + throw std::runtime_error("Not implemented"); + }; + + arrow::Status CreateDir(const std::string& path, bool recursive = true) override; + + arrow::Status DeleteDir(const std::string& path) override; + arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + arrow::Status DeleteRootDirContents() override { throw std::runtime_error("Not implemented"); } + + arrow::Status DeleteFile(const std::string& path) override; + + arrow::Status Move(const std::string& src, const std::string& dest) override; + + arrow::Status CopyFile(const std::string& src, const std::string& dest) override; + + arrow::Result> OpenInputStream(const std::string& path) override; + arrow::Result> OpenInputFile(const std::string& path) override; + + arrow::Result> OpenOutputStream( + const std::string& path, const std::shared_ptr& metadata = {}) override; + + arrow::Result> OpenAppendStream( + const std::string& path, const std::shared_ptr& metadata = {}) override; + + /// Create a S3FileSystem instance from the given options. + static arrow::Result> Make( + const OpendalOptions& options, const arrow::io::IOContext& = arrow::io::default_io_context()); + + protected: + OpendalFileSystem(const OpendalOptions& options, const arrow::io::IOContext& io_context); + opendal_operator* operator_; + OpendalOptions options_; +}; + +} // namespace milvus_storage diff --git a/cpp/src/common/fs_util.cpp b/cpp/src/common/fs_util.cpp index 72955b3..85b023f 100644 --- a/cpp/src/common/fs_util.cpp +++ b/cpp/src/common/fs_util.cpp @@ -6,6 +6,8 @@ #include #include "common/log.h" #include "common/macro.h" +#include "common/opendal_fs.h" + namespace milvus_storage { Result> BuildFileSystem(const std::string& uri, std::string* out_path) { @@ -26,22 +28,28 @@ Result> BuildFileSystem(const std::string // return std::shared_ptr(fs); // } - if (schema == "s3") { - if (!arrow::fs::IsS3Initialized()) { - RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized()); - std::atexit([]() { - auto status = arrow::fs::EnsureS3Finalized(); - if (!status.ok()) { - LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message(); - } - }); - } - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path)); - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option)); - + if (schema == "opendal") { + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option)); return std::shared_ptr(fs); } + // if (schema == "s3") { + // if (!arrow::fs::IsS3Initialized()) { + // RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized()); + // std::atexit([]() { + // auto status = arrow::fs::EnsureS3Finalized(); + // if (!status.ok()) { + // LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message(); + // } + // }); + // } + // ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path)); + // ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option)); + // + // return std::shared_ptr(fs); + // } + // return Status::InvalidArgument("Unsupported schema: " + schema); } /** @@ -57,4 +65,4 @@ std::string UriToPath(const std::string& uri) { return std::string(""); } } -}; // namespace milvus_storage \ No newline at end of file +}; // namespace milvus_storage diff --git a/cpp/src/common/opendal_fs.cpp b/cpp/src/common/opendal_fs.cpp new file mode 100644 index 0000000..139aa98 --- /dev/null +++ b/cpp/src/common/opendal_fs.cpp @@ -0,0 +1,301 @@ +#include "common/opendal_fs.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/status.h" + +namespace milvus_storage { + +std::string ToString(opendal_bytes& bs) { return {reinterpret_cast(bs.data), bs.len}; } + +arrow::Result OpendalOptions::FromUri(const arrow::internal::Uri& uri, std::string* out_path) { + OpendalOptions options; + const auto bucket = uri.host(); + auto path = uri.path(); + if (bucket.empty()) { + if (!path.empty()) { + return arrow::Status::Invalid("Missing bucket name in Opendal URI"); + } + } else { + if (path.empty()) { + path = bucket; + } else { + if (path[0] != '/') { + return arrow::Status::Invalid("Opendal URI should be absolute, not relative"); + } + path = bucket + path; + } + } + if (out_path != nullptr) { + *out_path = std::string(arrow::fs::internal::RemoveTrailingSlash(path)); + } + + ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items()); + for (const auto& kv : options_items) { + options.options_.emplace(kv.first, kv.second); + } + return options; +} + +class OpendalInputFile : public arrow::io::RandomAccessFile { + public: + explicit OpendalInputFile(arrow::io::IOContext io_context, opendal_reader* reader, opendal_metadata* metadata) + : io_context_(io_context), reader_(reader), metadata_(metadata) { + content_length_ = opendal_metadata_content_length(metadata_); + } + + arrow::Status Close() override { + closed_ = true; + return arrow::Status::OK(); + } + + bool closed() const override { return closed_; } + + arrow::Status CheckClosed() const { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return arrow::Status::OK(); + } + + arrow::Result Tell() const override { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + arrow::Result GetSize() override { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + arrow::Status Seek(int64_t position) override { + RETURN_NOT_OK(CheckClosed()); + pos_ = position; + return arrow::Status::OK(); + } + + arrow::Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + return arrow::Status::NotImplemented("Not implemented"); + } + + arrow::Result> ReadAt(int64_t position, int64_t nbytes) override { + return arrow::Status::NotImplemented("Not implemented"); + } + + arrow::Result Read(int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed()); + nbytes = std::min(nbytes, content_length_ - pos_); + if (nbytes == 0) { + return 0; + } + + auto result = opendal_reader_read(reader_, static_cast(out), nbytes); + if (result.error != nullptr) { + auto msg = "read failed: " + ToString(result.error->message); + opendal_error_free(result.error); + return arrow::Status::IOError(msg); + } + pos_ += result.size; + return result.size; + } + + arrow::Result> Read(int64_t nbytes) override { + RETURN_NOT_OK(CheckClosed()); + nbytes = std::min(nbytes, content_length_ - pos_); + + ARROW_ASSIGN_OR_RAISE(auto buf, arrow::AllocateResizableBuffer(nbytes, io_context_.pool())); + if (nbytes > 0) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); + DCHECK_LE(bytes_read, nbytes); + RETURN_NOT_OK(buf->Resize(bytes_read)); + } + pos_ += buf->size(); + return std::move(buf); + } + + private: + const arrow::io::IOContext io_context_; + opendal_reader* reader_; + opendal_metadata* metadata_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_; +}; + +arrow::Result> read(arrow::io::IOContext& io_context, + opendal_operator* op, + const std::string& path) { + auto reader = opendal_operator_reader(op, path.c_str()); + if (reader.error != nullptr) { + auto msg = "open reader failed: " + ToString(reader.error->message); + opendal_error_free(reader.error); + return arrow::Status::Invalid(msg); + } + auto stat = opendal_operator_stat(op, path.c_str()); + if (stat.error != nullptr) { + auto msg = "open reader failed: " + ToString(reader.error->message); + opendal_error_free(stat.error); + opendal_reader_free(reader.reader); + return arrow::Status::Invalid(msg); + } + auto file = std::make_shared(io_context, reader.reader, stat.meta); + opendal_reader_free(reader.reader); + return file; +} + +class OpendalOutputStream : public arrow::io::OutputStream {}; + +arrow::Result> OpendalFileSystem::Make(const OpendalOptions& options, + const arrow::io::IOContext& ctx) { + auto fs = std::shared_ptr(new OpendalFileSystem(options, ctx)); + opendal_operator_options* op_options_ = opendal_operator_options_new(); + for (auto& option : options.options()) { + opendal_operator_options_set(op_options_, option.first.c_str(), option.second.c_str()); + } + auto op = opendal_operator_new(options.at("scheme").c_str(), op_options_); + if (op.error != nullptr) { + auto msg = "open opendal operator failed: " + ToString(op.error->message); + opendal_error_free(op.error); + return arrow::Status::Invalid(msg); + } + fs->operator_ = op.op; + opendal_operator_options_free(op_options_); + return fs; +} + +OpendalFileSystem::OpendalFileSystem(const OpendalOptions& options, const arrow::io::IOContext& io_context) + : FileSystem(io_context), options_(options) {} + +OpendalFileSystem::~OpendalFileSystem() { + if (operator_ != nullptr) { + opendal_operator_free(operator_); + } +} + +arrow::Result OpendalFileSystem::GetFileInfo(const std::string& path) { + auto stat = opendal_operator_stat(operator_, path.c_str()); + if (stat.error != nullptr) { + auto msg = "stat failed: " + ToString(stat.error->message); + opendal_error_free(stat.error); + return arrow::Status::Invalid(msg); + } + auto file_info = arrow::fs::FileInfo{}; + file_info.set_path(path); + file_info.set_size(opendal_metadata_content_length(stat.meta)); + file_info.set_type(opendal_metadata_is_dir(stat.meta) ? arrow::fs::FileType::Directory + : opendal_metadata_is_file(stat.meta) ? arrow::fs::FileType::File + : arrow::fs::FileType::Unknown); + std::chrono::milliseconds mtime(opendal_metadata_last_modified_ms(stat.meta)); + file_info.set_mtime(arrow::fs::TimePoint(mtime)); + opendal_metadata_free(stat.meta); + + return file_info; +} + +bool OpendalFileSystem::Equals(const FileSystem& other) const { + if (this == &other) { + return true; + } + if (other.type_name() != type_name()) { + return false; + } + return options_.options() == static_cast(&other)->options_.options(); +} + +arrow::Result> OpendalFileSystem::GetFileInfo(const arrow::fs::FileSelector& select) { + std::vector file_infos; + auto lister = opendal_operator_list(operator_, select.base_dir.c_str()); + if (lister.error != nullptr) { + auto msg = "list failed: " + ToString(lister.error->message); + opendal_error_free(lister.error); + return arrow::Status::Invalid(msg); + } + while (true) { + auto entry = opendal_lister_next(lister.lister); + if (entry.entry == nullptr) { + break; + } + if (entry.error != nullptr) { + auto msg = "list failed: " + ToString(entry.error->message); + opendal_error_free(entry.error); + return arrow::Status::Invalid(msg); + } + char* de_path = opendal_entry_path(entry.entry); + ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(de_path)); + file_infos.push_back(info); + opendal_entry_free(entry.entry); + } + return file_infos; +} + +arrow::Status OpendalFileSystem::CreateDir(const std::string& path, bool recursive) { + auto error = opendal_operator_create_dir(operator_, path.c_str()); + if (error != nullptr) { + auto msg = "create dir failed: " + ToString(error->message); + opendal_error_free(error); + return arrow::Status::Invalid(msg); + } + return arrow::Status::OK(); +} + +arrow::Status OpendalFileSystem::DeleteDir(const std::string& path) { + auto error = opendal_operator_delete(operator_, path.c_str()); + if (error != nullptr) { + auto msg = "delete dir failed: " + ToString(error->message); + opendal_error_free(error); + return arrow::Status::Invalid(msg); + } + return arrow::Status::OK(); +} + +arrow::Status OpendalFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) { + return DeleteDir(path); +} + +arrow::Status OpendalFileSystem::DeleteFile(const std::string& path) { return DeleteDir(path); } + +arrow::Status OpendalFileSystem::Move(const std::string& src, const std::string& dest) { + auto error = opendal_operator_rename(operator_, src.c_str(), dest.c_str()); + if (error != nullptr) { + auto msg = "move failed: " + ToString(error->message); + opendal_error_free(error); + return arrow::Status::Invalid(msg); + } + return arrow::Status::OK(); +} + +arrow::Status OpendalFileSystem::CopyFile(const std::string& src, const std::string& dest) { + auto error = opendal_operator_copy(operator_, src.c_str(), dest.c_str()); + if (error != nullptr) { + auto msg = "copy failed: " + ToString(error->message); + opendal_error_free(error); + return arrow::Status::Invalid(msg); + } + return arrow::Status::OK(); +} +arrow::Result> OpendalFileSystem::OpenInputStream(const std::string& path) { + return read(io_context_, operator_, path); +} + +arrow::Result> OpendalFileSystem::OpenInputFile(const std::string& path) { + return read(io_context_, operator_, path); +} + +arrow::Result> OpendalFileSystem::OpenOutputStream( + const std::string& path, const std::shared_ptr& metadata) { + return arrow::Status::NotImplemented("OpenOutputStream Not implemented"); +} + +arrow::Result> OpendalFileSystem::OpenAppendStream( + const std::string& path, const std::shared_ptr& metadata) { + return arrow::Status::NotImplemented("OpendAppendStream Not implemented"); +} + +} // namespace milvus_storage