Skip to content

Commit

Permalink
Fix manfiest version allocation error (#30)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jul 26, 2023
1 parent 10dc26d commit 941a649
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
41 changes: 19 additions & 22 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {

std::lock_guard<std::mutex> lock(mutex_);
auto copied = new Manifest(*manifest_);
auto old_version = manifest_->version();
scalar_fragment.set_id(old_version + 1);
vector_fragment.set_id(old_version + 1);
copied->set_version(old_version + 1);
auto next_version = next_manifest_version_++;
scalar_fragment.set_id(next_version);
vector_fragment.set_id(next_version);
copied->set_version(next_version);
copied->add_scalar_fragment(std::move(scalar_fragment));
copied->add_vector_fragment(std::move(vector_fragment));
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
Expand Down Expand Up @@ -152,11 +152,11 @@ Status Space::Delete(arrow::RecordBatchReader* reader) {
if (writer) {
writer->Close();
std::lock_guard<std::mutex> lock(mutex_);
auto old_version = manifest_->version();
auto next_version = next_manifest_version_++;
auto copied = new Manifest(*manifest_);
fragment.add_file(delete_file);
fragment.set_id(old_version + 1);
copied->set_version(old_version + 1);
fragment.set_id(next_version);
copied->set_version(next_version);
copied->add_delete_fragment(std::move(fragment));
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
manifest_.reset(copied);
Expand Down Expand Up @@ -202,12 +202,7 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, Options optio

RETURN_ARROW_NOT_OK(fs->CreateDir(GetManifestDir(path)));

ASSIGN_OR_RETURN_NOT_OK(auto files, FindAllManifest(fs, path));
std::vector<arrow::fs::FileInfo> info_vec;
std::copy_if(files.begin(), files.end(), std::back_inserter(info_vec),
[](arrow::fs::FileInfo& f) { return ParseVersionFromFileName(f.base_name()) != -1; });

std::cout << info_vec.size() << std::endl;
ASSIGN_OR_RETURN_NOT_OK(auto info_vec, FindAllManifest(fs, path));
if (info_vec.empty()) {
// create the first manifest
if (options.schema == nullptr) {
Expand All @@ -217,14 +212,13 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, Options optio
RETURN_NOT_OK(SafeSaveManifest(fs, path, manifest.get()));
} else {
arrow::fs::FileInfo file_info;
auto max_cmp = [](arrow::fs::FileInfo& f1, arrow::fs::FileInfo& f2) {
return ParseVersionFromFileName(f1.base_name()) < ParseVersionFromFileName(f2.base_name());
};
auto latest = std::max_element(info_vec.begin(), info_vec.end(), max_cmp);
if (options.version == -1) {
// find latest manifest
auto max_manifest =
std::max_element(info_vec.begin(), info_vec.end(), [](arrow::fs::FileInfo& f1, arrow::fs::FileInfo& f2) {
return ParseVersionFromFileName(f1.base_name()) < ParseVersionFromFileName(f2.base_name());
});
file_info = *max_manifest;
next_manifest_version = ParseVersionFromFileName(file_info.base_name()) + 1;
file_info = *latest;
} else {
auto iter = std::find_if(info_vec.begin(), info_vec.end(), [&](arrow::fs::FileInfo& f) {
return ParseVersionFromFileName(f.base_name()) == options.version;
Expand All @@ -233,8 +227,8 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, Options optio
return Status::ManifestNotFound();
}
file_info = *iter;
next_manifest_version = options.version + 1;
}
next_manifest_version = ParseVersionFromFileName(file_info.base_name()) + 1;

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto istream, fs->OpenInputStream(file_info));
ASSIGN_OR_RETURN_NOT_OK(manifest, Manifest::ParseFromFile(istream, file_info));
Expand All @@ -244,7 +238,7 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, Options optio
space->fs_ = fs;
space->path_ = path;
space->manifest_ = manifest;
space->next_manifest_version_.store(next_manifest_version);
space->next_manifest_version_ = next_manifest_version;

RETURN_NOT_OK(space->Init());
return space;
Expand All @@ -256,7 +250,10 @@ Result<arrow::fs::FileInfoVector> Space::FindAllManifest(std::shared_ptr<arrow::
selector.allow_not_found = true;
selector.base_dir = path;

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto info_vec, fs->GetFileInfo(selector));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto files, fs->GetFileInfo(selector));
std::vector<arrow::fs::FileInfo> info_vec;
std::copy_if(files.begin(), files.end(), std::back_inserter(info_vec),
[](arrow::fs::FileInfo& f) { return ParseVersionFromFileName(f.base_name()) != -1; });
return info_vec;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Space {

DeleteFragmentVector delete_fragments_;

std::atomic_int64_t next_manifest_version_ = 0;
int64_t next_manifest_version_ = 0;
std::mutex mutex_;

friend FilterQueryRecordReader;
Expand Down
1 change: 0 additions & 1 deletion cpp/test/space_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ TEST(SpaceTest, SpaceWriteTest) {

auto uri = "file:///tmp/";
auto res = Space::Open(uri, Options{space_schema, -1});
std::cout << res.status().ToString() << std::endl;
ASSERT_TRUE(res.ok());
auto space = std::move(res.value());

Expand Down

0 comments on commit 941a649

Please sign in to comment.