Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change manfiest file dir to versions #25

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/src/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ namespace milvus_storage {

const int kReadBatchSize = 1024;

const std::string kManifestTempFileName = "manifest.tmp";
const std::string kManifestFileName = "manifest";
const std::string kManifestTempFileSuffix = ".manifest.tmp";
const std::string kManifestFileSuffix = "manifest";
const std::string kManifestsDir = "versions";
const std::string kParquetDataFileSuffix = ".parquet";
const std::string kOffsetFieldName = "__offset";

} // namespace milvus_storage
} // namespace milvus_storage
11 changes: 9 additions & 2 deletions cpp/src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <string>
#include "constants.h"
#include "macro.h"
#include "arrow/filesystem/path_util.h"
namespace milvus_storage {

Result<schema_proto::LogicType> ToProtobufType(arrow::Type::type type) {
Expand Down Expand Up @@ -225,9 +226,15 @@ std::string GetNewParquetFilePath(const std::string& path) {
return path + boost::uuids::to_string(scalar_file_id) + kParquetDataFileSuffix;
}

std::string GetManifestFilePath(const std::string& path) { return path + kManifestFileName; }
std::string GetManifestFilePath(const std::string& path, const int64_t version) {
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string>{path, kManifestsDir, std::to_string(version) + kManifestFileSuffix});
}

std::string GetManifestTmpFilePath(const std::string& path) { return path + kManifestTempFileName; }
std::string GetManifestTmpFilePath(const std::string& path, const int64_t version) {
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string>{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix});
}

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ Result<std::shared_ptr<arrow::Schema>> FromProtobufSchema(const schema_proto::Ar

std::string GetNewParquetFilePath(const std::string& path);

std::string GetManifestFilePath(const std::string& path);
std::string GetManifestFilePath(const std::string& path, int64_t version);

std::string GetManifestTmpFilePath(const std::string& path);
std::string GetManifestTmpFilePath(const std::string& path, int64_t version);

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include <algorithm>
#include <memory>
#include <mutex>
#include <numeric>

#include "arrow/array/builder_primitive.h"
Expand Down Expand Up @@ -113,6 +114,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {
vector_writer = nullptr;
}

std::lock_guard<std::mutex> lock(mutex_);
auto copied = new Manifest(*manifest_);
auto old_version = manifest_->version();
scalar_fragment.set_id(old_version + 1);
Expand All @@ -121,8 +123,8 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {
copied->add_scalar_fragment(std::move(scalar_fragment));
copied->add_vector_fragment(std::move(vector_fragment));
RETURN_NOT_OK(SafeSaveManifest(copied));

manifest_.reset(copied);

return Status::OK();
}

Expand Down Expand Up @@ -153,6 +155,7 @@ Status Space::Delete(arrow::RecordBatchReader* reader) {

if (writer) {
writer->Close();
std::lock_guard<std::mutex> lock(mutex_);
auto old_version = manifest_->version();
auto copied = new Manifest(*manifest_);
fragment.add_file(delete_file);
Expand All @@ -175,8 +178,8 @@ std::unique_ptr<arrow::RecordBatchReader> Space::Read(std::shared_ptr<ReadOption
}

Status Space::SafeSaveManifest(const Manifest* manifest) {
auto tmp_manifest_file_path = GetManifestTmpFilePath(UriToPath(manifest->space_options()->uri));
auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri));
auto tmp_manifest_file_path = GetManifestTmpFilePath(UriToPath(manifest->space_options()->uri), manifest->version());
auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri), manifest->version());

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(tmp_manifest_file_path));
Manifest::WriteManifestFile(manifest, output.get());
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <mutex>
#include "arrow/filesystem/filesystem.h"

#include "storage/manifest.h"
Expand All @@ -21,10 +22,11 @@ class Space {

Status Delete(arrow::RecordBatchReader* reader);

static std::unique_ptr<Space> Open(std::shared_ptr<arrow::fs::FileSystem> fs, std::string path);

static std::unique_ptr<Space> Open();
// Open opened a existed space. It will return a error in status if a space is not existed in path. If version is
// specified, it will restore to the state at this version. If not, it will choose the latest version.
static std::unique_ptr<Space> Open(std::shared_ptr<arrow::fs::FileSystem> fs, std::string path, int64_t version = -1);

// Create created a new space. If there is already a space, a error will be returned.
static std::unique_ptr<Space> Create();

private:
Expand All @@ -38,6 +40,8 @@ class Space {
std::shared_ptr<Manifest> manifest_;
std::shared_ptr<Options> options_;

std::mutex mutex_;

friend FilterQueryRecordReader;
friend RecordReader;
};
Expand Down