diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index c989fb7f..f828d504 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -112,10 +112,10 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) { std::lock_guard lock(mutex_); auto copied = new Manifest(*manifest_); - auto old_version = manifest_->version(); - scalar_fragment.set_id(old_version + 1); - vector_fragment.set_id(old_version + 1); - copied->set_version(old_version + 1); + auto next_version = next_manifest_version_++; + scalar_fragment.set_id(next_version); + vector_fragment.set_id(next_version); + copied->set_version(next_version); copied->add_scalar_fragment(std::move(scalar_fragment)); copied->add_vector_fragment(std::move(vector_fragment)); RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied)); @@ -152,11 +152,11 @@ Status Space::Delete(arrow::RecordBatchReader* reader) { if (writer) { writer->Close(); std::lock_guard lock(mutex_); - auto old_version = manifest_->version(); + auto next_version = next_manifest_version_++; auto copied = new Manifest(*manifest_); fragment.add_file(delete_file); - fragment.set_id(old_version + 1); - copied->set_version(old_version + 1); + fragment.set_id(next_version); + copied->set_version(next_version); copied->add_delete_fragment(std::move(fragment)); RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied)); manifest_.reset(copied); @@ -202,12 +202,7 @@ Result> Space::Open(const std::string& uri, Options optio RETURN_ARROW_NOT_OK(fs->CreateDir(GetManifestDir(path))); - ASSIGN_OR_RETURN_NOT_OK(auto files, FindAllManifest(fs, path)); - std::vector info_vec; - std::copy_if(files.begin(), files.end(), std::back_inserter(info_vec), - [](arrow::fs::FileInfo& f) { return ParseVersionFromFileName(f.base_name()) != -1; }); - - std::cout << info_vec.size() << std::endl; + ASSIGN_OR_RETURN_NOT_OK(auto info_vec, FindAllManifest(fs, path)); if (info_vec.empty()) { // create the first manifest if (options.schema == nullptr) { @@ -217,14 +212,13 @@ Result> Space::Open(const std::string& uri, Options optio RETURN_NOT_OK(SafeSaveManifest(fs, path, manifest.get())); } else { arrow::fs::FileInfo file_info; + auto max_cmp = [](arrow::fs::FileInfo& f1, arrow::fs::FileInfo& f2) { + return ParseVersionFromFileName(f1.base_name()) < ParseVersionFromFileName(f2.base_name()); + }; + auto latest = std::max_element(info_vec.begin(), info_vec.end(), max_cmp); if (options.version == -1) { // find latest manifest - auto max_manifest = - std::max_element(info_vec.begin(), info_vec.end(), [](arrow::fs::FileInfo& f1, arrow::fs::FileInfo& f2) { - return ParseVersionFromFileName(f1.base_name()) < ParseVersionFromFileName(f2.base_name()); - }); - file_info = *max_manifest; - next_manifest_version = ParseVersionFromFileName(file_info.base_name()) + 1; + file_info = *latest; } else { auto iter = std::find_if(info_vec.begin(), info_vec.end(), [&](arrow::fs::FileInfo& f) { return ParseVersionFromFileName(f.base_name()) == options.version; @@ -233,8 +227,8 @@ Result> Space::Open(const std::string& uri, Options optio return Status::ManifestNotFound(); } file_info = *iter; - next_manifest_version = options.version + 1; } + next_manifest_version = ParseVersionFromFileName(file_info.base_name()) + 1; ASSIGN_OR_RETURN_ARROW_NOT_OK(auto istream, fs->OpenInputStream(file_info)); ASSIGN_OR_RETURN_NOT_OK(manifest, Manifest::ParseFromFile(istream, file_info)); @@ -244,7 +238,7 @@ Result> Space::Open(const std::string& uri, Options optio space->fs_ = fs; space->path_ = path; space->manifest_ = manifest; - space->next_manifest_version_.store(next_manifest_version); + space->next_manifest_version_ = next_manifest_version; RETURN_NOT_OK(space->Init()); return space; @@ -256,7 +250,10 @@ Result Space::FindAllManifest(std::shared_ptrGetFileInfo(selector)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto files, fs->GetFileInfo(selector)); + std::vector info_vec; + std::copy_if(files.begin(), files.end(), std::back_inserter(info_vec), + [](arrow::fs::FileInfo& f) { return ParseVersionFromFileName(f.base_name()) != -1; }); return info_vec; } diff --git a/cpp/src/storage/space.h b/cpp/src/storage/space.h index 15c5455a..42292e03 100644 --- a/cpp/src/storage/space.h +++ b/cpp/src/storage/space.h @@ -41,7 +41,7 @@ class Space { DeleteFragmentVector delete_fragments_; - std::atomic_int64_t next_manifest_version_ = 0; + int64_t next_manifest_version_ = 0; std::mutex mutex_; friend FilterQueryRecordReader; diff --git a/cpp/test/space_test.cpp b/cpp/test/space_test.cpp index ea2e5652..53bf58dc 100644 --- a/cpp/test/space_test.cpp +++ b/cpp/test/space_test.cpp @@ -49,7 +49,6 @@ TEST(SpaceTest, SpaceWriteTest) { auto uri = "file:///tmp/"; auto res = Space::Open(uri, Options{space_schema, -1}); - std::cout << res.status().ToString() << std::endl; ASSERT_TRUE(res.ok()); auto space = std::move(res.value());