Skip to content

Commit

Permalink
Change manfiest file dir to versions (#25)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jul 25, 2023
1 parent b940932 commit 057d6ee
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 13 deletions.
7 changes: 4 additions & 3 deletions cpp/src/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ namespace milvus_storage {

const int kReadBatchSize = 1024;

const std::string kManifestTempFileName = "manifest.tmp";
const std::string kManifestFileName = "manifest";
const std::string kManifestTempFileSuffix = ".manifest.tmp";
const std::string kManifestFileSuffix = "manifest";
const std::string kManifestsDir = "versions";
const std::string kParquetDataFileSuffix = ".parquet";
const std::string kOffsetFieldName = "__offset";

} // namespace milvus_storage
} // namespace milvus_storage
11 changes: 9 additions & 2 deletions cpp/src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <string>
#include "constants.h"
#include "macro.h"
#include "arrow/filesystem/path_util.h"
namespace milvus_storage {

Result<schema_proto::LogicType> ToProtobufType(arrow::Type::type type) {
Expand Down Expand Up @@ -225,9 +226,15 @@ std::string GetNewParquetFilePath(const std::string& path) {
return path + boost::uuids::to_string(scalar_file_id) + kParquetDataFileSuffix;
}

std::string GetManifestFilePath(const std::string& path) { return path + kManifestFileName; }
std::string GetManifestFilePath(const std::string& path, const int64_t version) {
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string>{path, kManifestsDir, std::to_string(version) + kManifestFileSuffix});
}

std::string GetManifestTmpFilePath(const std::string& path) { return path + kManifestTempFileName; }
std::string GetManifestTmpFilePath(const std::string& path, const int64_t version) {
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string>{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix});
}

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ Result<std::shared_ptr<arrow::Schema>> FromProtobufSchema(const schema_proto::Ar

std::string GetNewParquetFilePath(const std::string& path);

std::string GetManifestFilePath(const std::string& path);
std::string GetManifestFilePath(const std::string& path, int64_t version);

std::string GetManifestTmpFilePath(const std::string& path);
std::string GetManifestTmpFilePath(const std::string& path, int64_t version);

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include <algorithm>
#include <memory>
#include <mutex>
#include <numeric>

#include "arrow/array/builder_primitive.h"
Expand Down Expand Up @@ -113,6 +114,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {
vector_writer = nullptr;
}

std::lock_guard<std::mutex> lock(mutex_);
auto copied = new Manifest(*manifest_);
auto old_version = manifest_->version();
scalar_fragment.set_id(old_version + 1);
Expand All @@ -121,8 +123,8 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {
copied->add_scalar_fragment(std::move(scalar_fragment));
copied->add_vector_fragment(std::move(vector_fragment));
RETURN_NOT_OK(SafeSaveManifest(copied));

manifest_.reset(copied);

return Status::OK();
}

Expand Down Expand Up @@ -153,6 +155,7 @@ Status Space::Delete(arrow::RecordBatchReader* reader) {

if (writer) {
writer->Close();
std::lock_guard<std::mutex> lock(mutex_);
auto old_version = manifest_->version();
auto copied = new Manifest(*manifest_);
fragment.add_file(delete_file);
Expand All @@ -175,8 +178,8 @@ std::unique_ptr<arrow::RecordBatchReader> Space::Read(std::shared_ptr<ReadOption
}

Status Space::SafeSaveManifest(const Manifest* manifest) {
auto tmp_manifest_file_path = GetManifestTmpFilePath(UriToPath(manifest->space_options()->uri));
auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri));
auto tmp_manifest_file_path = GetManifestTmpFilePath(UriToPath(manifest->space_options()->uri), manifest->version());
auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri), manifest->version());

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(tmp_manifest_file_path));
Manifest::WriteManifestFile(manifest, output.get());
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <mutex>
#include "arrow/filesystem/filesystem.h"

#include "storage/manifest.h"
Expand All @@ -21,10 +22,11 @@ class Space {

Status Delete(arrow::RecordBatchReader* reader);

static std::unique_ptr<Space> Open(std::shared_ptr<arrow::fs::FileSystem> fs, std::string path);

static std::unique_ptr<Space> Open();
// Open opened a existed space. It will return a error in status if a space is not existed in path. If version is
// specified, it will restore to the state at this version. If not, it will choose the latest version.
static std::unique_ptr<Space> Open(std::shared_ptr<arrow::fs::FileSystem> fs, std::string path, int64_t version = -1);

// Create created a new space. If there is already a space, a error will be returned.
static std::unique_ptr<Space> Create();

private:
Expand All @@ -38,6 +40,8 @@ class Space {
std::shared_ptr<Manifest> manifest_;
std::shared_ptr<Options> options_;

std::mutex mutex_;

friend FilterQueryRecordReader;
friend RecordReader;
};
Expand Down

0 comments on commit 057d6ee

Please sign in to comment.