diff --git a/cpp/src/common/constants.h b/cpp/src/common/constants.h index b1c00cf1..0cba9d72 100644 --- a/cpp/src/common/constants.h +++ b/cpp/src/common/constants.h @@ -4,9 +4,10 @@ namespace milvus_storage { const int kReadBatchSize = 1024; -const std::string kManifestTempFileName = "manifest.tmp"; -const std::string kManifestFileName = "manifest"; +const std::string kManifestTempFileSuffix = ".manifest.tmp"; +const std::string kManifestFileSuffix = "manifest"; +const std::string kManifestsDir = "versions"; const std::string kParquetDataFileSuffix = ".parquet"; const std::string kOffsetFieldName = "__offset"; -} // namespace milvus_storage \ No newline at end of file +} // namespace milvus_storage diff --git a/cpp/src/common/utils.cpp b/cpp/src/common/utils.cpp index b3aba1d9..d0374127 100644 --- a/cpp/src/common/utils.cpp +++ b/cpp/src/common/utils.cpp @@ -9,6 +9,7 @@ #include #include "constants.h" #include "macro.h" +#include "arrow/filesystem/path_util.h" namespace milvus_storage { Result ToProtobufType(arrow::Type::type type) { @@ -225,9 +226,15 @@ std::string GetNewParquetFilePath(const std::string& path) { return path + boost::uuids::to_string(scalar_file_id) + kParquetDataFileSuffix; } -std::string GetManifestFilePath(const std::string& path) { return path + kManifestFileName; } +std::string GetManifestFilePath(const std::string& path, const int64_t version) { + return arrow::fs::internal::JoinAbstractPath( + std::vector{path, kManifestsDir, std::to_string(version) + kManifestFileSuffix}); +} -std::string GetManifestTmpFilePath(const std::string& path) { return path + kManifestTempFileName; } +std::string GetManifestTmpFilePath(const std::string& path, const int64_t version) { + return arrow::fs::internal::JoinAbstractPath( + std::vector{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix}); +} Result> ProjectSchema(std::shared_ptr schema, std::vector columns) { diff --git a/cpp/src/common/utils.h b/cpp/src/common/utils.h index 30b23c98..4a3cd3b4 100644 --- a/cpp/src/common/utils.h +++ b/cpp/src/common/utils.h @@ -13,9 +13,9 @@ Result> FromProtobufSchema(const schema_proto::Ar std::string GetNewParquetFilePath(const std::string& path); -std::string GetManifestFilePath(const std::string& path); +std::string GetManifestFilePath(const std::string& path, int64_t version); -std::string GetManifestTmpFilePath(const std::string& path); +std::string GetManifestTmpFilePath(const std::string& path, int64_t version); Result> ProjectSchema(std::shared_ptr schema, std::vector columns); diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index 6e9c3611..aa0d0d10 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include "arrow/array/builder_primitive.h" @@ -113,6 +114,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) { vector_writer = nullptr; } + std::lock_guard lock(mutex_); auto copied = new Manifest(*manifest_); auto old_version = manifest_->version(); scalar_fragment.set_id(old_version + 1); @@ -121,8 +123,8 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) { copied->add_scalar_fragment(std::move(scalar_fragment)); copied->add_vector_fragment(std::move(vector_fragment)); RETURN_NOT_OK(SafeSaveManifest(copied)); - manifest_.reset(copied); + return Status::OK(); } @@ -153,6 +155,7 @@ Status Space::Delete(arrow::RecordBatchReader* reader) { if (writer) { writer->Close(); + std::lock_guard lock(mutex_); auto old_version = manifest_->version(); auto copied = new Manifest(*manifest_); fragment.add_file(delete_file); @@ -175,8 +178,8 @@ std::unique_ptr Space::Read(std::shared_ptrspace_options()->uri)); - auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri)); + auto tmp_manifest_file_path = GetManifestTmpFilePath(UriToPath(manifest->space_options()->uri), manifest->version()); + auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri), manifest->version()); ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(tmp_manifest_file_path)); Manifest::WriteManifestFile(manifest, output.get()); diff --git a/cpp/src/storage/space.h b/cpp/src/storage/space.h index b4429c3e..5d4f8ad0 100644 --- a/cpp/src/storage/space.h +++ b/cpp/src/storage/space.h @@ -1,4 +1,5 @@ #pragma once +#include #include "arrow/filesystem/filesystem.h" #include "storage/manifest.h" @@ -21,10 +22,11 @@ class Space { Status Delete(arrow::RecordBatchReader* reader); - static std::unique_ptr Open(std::shared_ptr fs, std::string path); - - static std::unique_ptr Open(); + // Open opened a existed space. It will return a error in status if a space is not existed in path. If version is + // specified, it will restore to the state at this version. If not, it will choose the latest version. + static std::unique_ptr Open(std::shared_ptr fs, std::string path, int64_t version = -1); + // Create created a new space. If there is already a space, a error will be returned. static std::unique_ptr Create(); private: @@ -38,6 +40,8 @@ class Space { std::shared_ptr manifest_; std::shared_ptr options_; + std::mutex mutex_; + friend FilterQueryRecordReader; friend RecordReader; };