From 8743cb4b786333cd3c20efe2b84842599f4cfb74 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Wed, 9 Aug 2023 14:33:15 +0800 Subject: [PATCH] Add blob interfaces in space (#37) Signed-off-by: sunby --- cpp/src/common/constants.h | 1 + cpp/src/common/status.cpp | 4 +++- cpp/src/common/status.h | 6 +++--- cpp/src/common/utils.cpp | 9 +++++++++ cpp/src/common/utils.h | 3 +++ cpp/src/storage/manifest.cpp | 19 ++++++++++++++++++ cpp/src/storage/manifest.h | 8 ++++++++ cpp/src/storage/space.cpp | 37 ++++++++++++++++++++++++++++++++---- cpp/src/storage/space.h | 4 ++-- 9 files changed, 81 insertions(+), 10 deletions(-) diff --git a/cpp/src/common/constants.h b/cpp/src/common/constants.h index 1eea433a..d8c15d12 100644 --- a/cpp/src/common/constants.h +++ b/cpp/src/common/constants.h @@ -10,6 +10,7 @@ const std::string kManifestsDir = "versions"; const std::string kScalarDataDir = "scalar"; const std::string kVectorDataDir = "vector"; const std::string kDeleteDataDir = "delete"; +const std::string kBlobDir = "blobs"; const std::string kParquetDataFileSuffix = ".parquet"; const std::string kOffsetFieldName = "__offset"; diff --git a/cpp/src/common/status.cpp b/cpp/src/common/status.cpp index cb2f7ada..6c20612b 100644 --- a/cpp/src/common/status.cpp +++ b/cpp/src/common/status.cpp @@ -26,6 +26,8 @@ std::string Status::ToString() const { case kInternalStateError: res = "InternalStateError: "; break; + case kFileNotFound: + res = "FileNotFound: "; default: std::sprintf(tmp, "Unknown code(%d): ", code_); res = tmp; @@ -35,4 +37,4 @@ std::string Status::ToString() const { return res; } -} // namespace milvus_storage \ No newline at end of file +} // namespace milvus_storage diff --git a/cpp/src/common/status.h b/cpp/src/common/status.h index f9a5d25e..cea44afe 100644 --- a/cpp/src/common/status.h +++ b/cpp/src/common/status.h @@ -19,7 +19,7 @@ class Status { static Status InternalStateError(const std::string& msg) { return Status(kInternalStateError, msg); } - static Status ManifestNotFound(const std::string& msg = "") { return Status(kManifestNotFound, msg); } + static Status FileNotFound(const std::string& msg = "") { return Status(kFileNotFound, msg); } bool ok() const { return code_ == kOk; } @@ -29,7 +29,7 @@ class Status { bool IsInternalStateError() const { return code_ == kInternalStateError; } - bool IsManifestNotFound() const { return code_ == kManifestNotFound; } + bool IsManifestNotFound() const { return code_ == kFileNotFound; } std::string ToString() const; @@ -39,7 +39,7 @@ class Status { kArrowError = 1, kInvalidArgument = 2, kInternalStateError = 3, - kManifestNotFound = 4, + kFileNotFound = 4, }; explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {} diff --git a/cpp/src/common/utils.cpp b/cpp/src/common/utils.cpp index e6268c22..5c0df1f6 100644 --- a/cpp/src/common/utils.cpp +++ b/cpp/src/common/utils.cpp @@ -253,6 +253,15 @@ std::string GetManifestTmpFilePath(const std::string& path, const int64_t versio return arrow::fs::internal::JoinAbstractPath( std::vector{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix}); } +std::string GetBolbDir(const std::string& path) { + return arrow::fs::internal::JoinAbstractPath(std::vector{path, kBlobDir}); +} + +std::string GetNewBlobFilePath(const std::string& path) { + auto scalar_file_id = boost::uuids::random_generator()(); + return arrow::fs::internal::JoinAbstractPath( + std::vector{path, kBlobDir, boost::uuids::to_string(scalar_file_id)}); +} int64_t ParseVersionFromFileName(const std::string& path) { auto pos = path.find(kManifestFileSuffix); diff --git a/cpp/src/common/utils.h b/cpp/src/common/utils.h index aed19d97..e6d60916 100644 --- a/cpp/src/common/utils.h +++ b/cpp/src/common/utils.h @@ -27,4 +27,7 @@ std::string GetManifestDir(const std::string& path); std::string GetScalarDataDir(const std::string& path); std::string GetVectorDataDir(const std::string& path); std::string GetDeleteDataDir(const std::string& path); +std::string GetBolbDir(const std::string& path); + +std::string GetNewBlobFilePath(const std::string& path); } // namespace milvus_storage diff --git a/cpp/src/storage/manifest.cpp b/cpp/src/storage/manifest.cpp index a625257e..da1b5692 100644 --- a/cpp/src/storage/manifest.cpp +++ b/cpp/src/storage/manifest.cpp @@ -15,12 +15,31 @@ void Manifest::add_vector_fragment(Fragment&& fragment) { vector_fragments_.push void Manifest::add_delete_fragment(Fragment&& fragment) { delete_fragments_.push_back(fragment); } +void Manifest::add_blob(Blob&& blob) { blobs_.emplace_back(blob); } + const FragmentVector& Manifest::scalar_fragments() const { return scalar_fragments_; } const FragmentVector& Manifest::vector_fragments() const { return vector_fragments_; } const FragmentVector& Manifest::delete_fragments() const { return delete_fragments_; } +bool Manifest::has_blob(std::string& name) { + auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; }); + return iter != blobs_.end(); +} + +void Manifest::remove_blob_if_exist(std::string& name) { + std::remove_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; }); +} + +Result Manifest::get_blob(std::string& name) { + auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; }); + if (iter == blobs_.end()) { + return Status::FileNotFound("blob not found"); + } + return *iter; +} + int64_t Manifest::version() const { return version_; } void Manifest::set_version(int64_t version) { version_ = version; } diff --git a/cpp/src/storage/manifest.h b/cpp/src/storage/manifest.h index ada041d3..2bbfbd54 100644 --- a/cpp/src/storage/manifest.h +++ b/cpp/src/storage/manifest.h @@ -19,12 +19,20 @@ class Manifest { void add_delete_fragment(Fragment&& fragment); + void add_blob(Blob&& blob); + const FragmentVector& scalar_fragments() const; const FragmentVector& vector_fragments() const; const FragmentVector& delete_fragments() const; + bool has_blob(std::string& name); + + void remove_blob_if_exist(std::string& name); + + Result get_blob(std::string& name); + int64_t version() const; void set_version(int64_t version); diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index 36644b77..b809a8fd 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -174,11 +175,39 @@ std::unique_ptr Space::Read(std::shared_ptrschema(), fs_, delete_fragments_, option); } -Status Space::WriteBolb(std::string name, char* blob, int64_t length, bool replace) {} +Status Space::WriteBolb(std::string name, void* blob, int64_t length, bool replace) { + if (!replace && manifest_->has_blob(name)) { + return Status::InvalidArgument("blob already exist"); + } + + std::string blob_file_path = GetNewBlobFilePath(path_); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(blob_file_path)); + RETURN_ARROW_NOT_OK(output->Write(blob, length)); + RETURN_ARROW_NOT_OK(output->Close()); + + std::lock_guard lock(mutex_); + auto next_version = next_manifest_version_++; + auto copied = new Manifest(*manifest_); + copied->remove_blob_if_exist(name); + copied->add_blob({name, length, blob_file_path}); + RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied)); + manifest_.reset(copied); + return Status::OK(); +} -Status Space::ReadBlob(std::string name, char* target) {} +Status Space::ReadBlob(std::string name, void* target) { + auto manifest = manifest_; + ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs_->OpenInputFile(blob.file)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto _, file->Read(blob.size, target)); + return Status::OK(); +} -Result Space::GetBlobByteSize(std::string name) {} +Result Space::GetBlobByteSize(std::string name) { + auto manifest = manifest_; + ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name)); + return blob.size; +} Status Space::SafeSaveManifest(std::shared_ptr fs, const std::string& path, @@ -233,7 +262,7 @@ Result> Space::Open(const std::string& uri, Options optio return ParseVersionFromFileName(f.base_name()) == options.version; }); if (iter == info_vec.end()) { - return Status::ManifestNotFound(); + return Status::FileNotFound(); } file_info = *iter; } diff --git a/cpp/src/storage/space.h b/cpp/src/storage/space.h index 2d0400d9..1f6c79ba 100644 --- a/cpp/src/storage/space.h +++ b/cpp/src/storage/space.h @@ -25,10 +25,10 @@ class Space { static Result> Open(const std::string& uri, Options options); // Write a blob to space. Will return a error if replace is false and a blob with the same name exists. - Status WriteBolb(std::string name, char* blob, int64_t length, bool replace = false); + Status WriteBolb(std::string name, void* blob, int64_t length, bool replace = false); // Read a blob from space, the target must have enough size to hold this blob. - Status ReadBlob(std::string name, char* target); + Status ReadBlob(std::string name, void* target); // Get the byte size of a blob. Result GetBlobByteSize(std::string name);