Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add opendal fs implement #91

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ option(WITH_ASAN "Build with address sanitizer." OFF)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
include(libopendal)

find_package(Boost REQUIRED)
find_package(Arrow REQUIRED)
find_package(protobuf REQUIRED)
find_package(glog REQUIRED)
find_package(AWSSDK REQUIRED)

file(GLOB_RECURSE SRC_FILES src/*.cpp src/*.cc)
message(STATUS "SRC_FILES: ${SRC_FILES}")
add_library(milvus-storage ${SRC_FILES})
target_include_directories(milvus-storage PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include/milvus-storage ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(milvus-storage PUBLIC arrow::libarrow arrow::libparquet Boost::boost protobuf::protobuf aws-sdk-cpp::aws-sdk-cpp)
target_link_libraries(milvus-storage PUBLIC glog::glog)
target_link_libraries(milvus-storage PUBLIC
arrow::libarrow
arrow::libparquet
Boost::boost
protobuf::protobuf
glog::glog
opendal
)

if (WITH_UT)
enable_testing()
Expand Down
55 changes: 55 additions & 0 deletions cpp/cmake/libopendal.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
function(build_opendal)
include(ExternalProject)
set(OPENDAL_NAME "libopendal_c${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(OPENDAL_PREFIX ${CMAKE_BINARY_DIR}/thirdparty/opendal_ep)

file(MAKE_DIRECTORY
"${OPENDAL_PREFIX}"
"${OPENDAL_PREFIX}/lib"
"${OPENDAL_PREFIX}/include"
)

if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(OPENDAL_BUILD_TYPE "debug")
else()
set(OPENDAL_BUILD_TYPE "release")
endif()

ExternalProject_Add(
opendal_ep
GIT_REPOSITORY https://github.com/apache/incubator-opendal.git
GIT_TAG main
PREFIX ${OPENDAL_PREFIX}
SOURCE_SUBDIR bindings/c
CONFIGURE_COMMAND echo "configure for opendal_ep"
BUILD_COMMAND cargo build --${OPENDAL_BUILD_TYPE}
BUILD_IN_SOURCE 1
INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/")


add_library(opendal STATIC IMPORTED)
set_target_properties(opendal
PROPERTIES
IMPORTED_GLOBAL TRUE
IMPORTED_LOCATION "${OPENDAL_PREFIX}/lib/${OPENDAL_NAME}"
INTERFACE_INCLUDE_DIRECTORIES "${OPENDAL_PREFIX}/include")
add_dependencies(opendal opendal_ep)
if(APPLE)
target_link_libraries(opendal INTERFACE "-framework CoreFoundation")
target_link_libraries(opendal INTERFACE "-framework Security")
target_link_libraries(opendal INTERFACE "-framework SystemConfiguration")
endif()

get_target_property(OPENDAL_IMPORTED_LOCATION opendal IMPORTED_LOCATION)
get_target_property(OPENDAL_INTERFACE_INCLUDE_DIRECTORIES opendal INTERFACE_INCLUDE_DIRECTORIES)
message("OPENDAL_IMPORTED_LOCATION: ${OPENDAL_IMPORTED_LOCATION}")
message("OPENDAL_INTERFACE_INCLUDE_DIRECTORIES: ${OPENDAL_INTERFACE_INCLUDE_DIRECTORIES}")
endfunction()

if (opendal_FOUND)
message("Found opendal: ${opendal_INCLUDE_DIRS}")
else()
build_opendal()
endif()


8 changes: 4 additions & 4 deletions cpp/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class StorageConan(ConanFile):
"with_asan": False,
"with_profiler": False,
"with_ut": True,
"arrow:with_s3": True,
"aws-sdk-cpp:config": True,
"aws-sdk-cpp:text-to-speech": False,
"aws-sdk-cpp:transfer": False,
# "arrow:with_s3": True,
# "aws-sdk-cpp:config": True,
# "aws-sdk-cpp:text-to-speech": False,
# "aws-sdk-cpp:transfer": False,
"arrow:filesystem_layer": True,
"arrow:dataset_modules": True,
"arrow:parquet": True,
Expand Down
65 changes: 65 additions & 0 deletions cpp/include/milvus-storage/common/opendal_fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <arrow/filesystem/filesystem.h>
#include <arrow/util/macros.h>
#include <arrow/util/uri.h>
#include "opendal.h"

namespace milvus_storage {

class OpendalOptions {
public:
static arrow::Result<OpendalOptions> FromUri(const arrow::internal::Uri& uri, std::string* out_path);

const std::unordered_map<std::string, std::string>& options() const { return options_; }

const std::string& at(const std::string& key) const { return options_.at(key); }

protected:
std::unordered_map<std::string, std::string> options_;
};

class OpendalFileSystem : public arrow::fs::FileSystem {
public:
~OpendalFileSystem() override;

std::string type_name() const override { return "opendal"; }

bool Equals(const FileSystem& other) const override;

arrow::Result<arrow::fs::FileInfo> GetFileInfo(const std::string& path) override;
arrow::Result<std::vector<arrow::fs::FileInfo>> GetFileInfo(const arrow::fs::FileSelector& select) override;
arrow::fs::FileInfoGenerator GetFileInfoGenerator(const arrow::fs::FileSelector& select) override {
throw std::runtime_error("Not implemented");
};

arrow::Status CreateDir(const std::string& path, bool recursive = true) override;

arrow::Status DeleteDir(const std::string& path) override;
arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override;
arrow::Status DeleteRootDirContents() override { throw std::runtime_error("Not implemented"); }

arrow::Status DeleteFile(const std::string& path) override;

arrow::Status Move(const std::string& src, const std::string& dest) override;

arrow::Status CopyFile(const std::string& src, const std::string& dest) override;

arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const std::string& path) override;
arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const std::string& path) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata = {}) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenAppendStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata = {}) override;

/// Create a S3FileSystem instance from the given options.
static arrow::Result<std::shared_ptr<OpendalFileSystem>> Make(
const OpendalOptions& options, const arrow::io::IOContext& = arrow::io::default_io_context());

protected:
OpendalFileSystem(const OpendalOptions& options, const arrow::io::IOContext& io_context);
opendal_operator* operator_;
OpendalOptions options_;
};

} // namespace milvus_storage
36 changes: 22 additions & 14 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <cstdlib>
#include "common/log.h"
#include "common/macro.h"
#include "common/opendal_fs.h"

namespace milvus_storage {

Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
Expand All @@ -26,22 +28,28 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }

if (schema == "s3") {
if (!arrow::fs::IsS3Initialized()) {
RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized());
std::atexit([]() {
auto status = arrow::fs::EnsureS3Finalized();
if (!status.ok()) {
LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message();
}
});
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option));

if (schema == "opendal") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
}

// if (schema == "s3") {
// if (!arrow::fs::IsS3Initialized()) {
// RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized());
// std::atexit([]() {
// auto status = arrow::fs::EnsureS3Finalized();
// if (!status.ok()) {
// LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message();
// }
// });
// }
// ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path));
// ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option));
//
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }
//
return Status::InvalidArgument("Unsupported schema: " + schema);
}
/**
Expand All @@ -57,4 +65,4 @@ std::string UriToPath(const std::string& uri) {
return std::string("");
}
}
}; // namespace milvus_storage
}; // namespace milvus_storage
Loading
Loading