Skip to content

Commit

Permalink
[Cpp]: change some function signature (#106)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 3, 2024
1 parent 1f62f9c commit ec1bbbf
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Manifest {

void FromProtobuf(const manifest_proto::Manifest& manifest);

static Status WriteManifestFile(const Manifest* manifest, arrow::io::OutputStream* output);
static Status WriteManifestFile(const Manifest& manifest, arrow::io::OutputStream& output);

static Result<std::shared_ptr<Manifest>> ParseFromFile(std::shared_ptr<arrow::io::InputStream> istream,
arrow::fs::FileInfo& file_info);
Expand Down
7 changes: 2 additions & 5 deletions cpp/include/milvus-storage/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,9 @@ class Space {
private:
Status Init();

static Status SafeSaveManifest(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path,
const Manifest* manifest);
static Status SafeSaveManifest(arrow::fs::FileSystem& fs, const std::string& path, const Manifest& manifest);

static Result<arrow::fs::FileInfoVector> FindAllManifest(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path);
static Result<arrow::fs::FileInfoVector> FindAllManifest(arrow::fs::FileSystem& fs, const std::string& path);

std::shared_ptr<arrow::fs::FileSystem> fs_;
std::shared_ptr<Manifest> manifest_;
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -104,12 +104,12 @@ void Manifest::FromProtobuf(const manifest_proto::Manifest& manifest) {
version_ = manifest.version();
}

Status Manifest::WriteManifestFile(const Manifest* manifest, arrow::io::OutputStream* output) {
ASSIGN_OR_RETURN_NOT_OK(auto proto_manifest, manifest->ToProtobuf());
Status Manifest::WriteManifestFile(const Manifest& manifest, arrow::io::OutputStream& output) {
ASSIGN_OR_RETURN_NOT_OK(auto proto_manifest, manifest.ToProtobuf());
auto size = proto_manifest.ByteSizeLong();
char* buffer = new char[size];
proto_manifest.SerializeToArray(buffer, static_cast<int>(size));
RETURN_ARROW_NOT_OK(output->Write(buffer, size));
RETURN_ARROW_NOT_OK(output.Write(buffer, size));
delete[] buffer;
return Status::OK();
}
Expand Down
29 changes: 13 additions & 16 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Status Space::Write(arrow::RecordBatchReader& reader, const WriteOption& option)
copied->set_version(next_version);
copied->add_scalar_fragment(std::move(scalar_fragment));
copied->add_vector_fragment(std::move(vector_fragment));
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
RETURN_NOT_OK(SafeSaveManifest(*fs_, path_, *copied));
manifest_.reset(copied);

return Status::OK();
Expand Down Expand Up @@ -173,7 +173,7 @@ Status Space::Delete(arrow::RecordBatchReader& reader) {
fragment.set_id(next_version);
copied->set_version(next_version);
copied->add_delete_fragment(std::move(fragment));
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
RETURN_NOT_OK(SafeSaveManifest(*fs_, path_, *copied));
manifest_.reset(copied);
}
return Status::OK();
Expand All @@ -200,7 +200,7 @@ Status Space::WriteBlob(const std::string& name, const void* blob, int64_t lengt
copied->set_version(next_version);
copied->remove_blob_if_exist(name);
copied->add_blob({name, length, blob_file_path});
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
RETURN_NOT_OK(SafeSaveManifest(*fs_, path_, *copied));
manifest_.reset(copied);
return Status::OK();
}
Expand All @@ -219,18 +219,16 @@ Result<int64_t> Space::GetBlobByteSize(const std::string& name) const {
return blob.size;
}

Status Space::SafeSaveManifest(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path,
const Manifest* manifest) {
auto tmp_manifest_file_path = GetManifestTmpFilePath(path, manifest->version());
auto manifest_file_path = GetManifestFilePath(path, manifest->version());
Status Space::SafeSaveManifest(arrow::fs::FileSystem& fs, const std::string& path, const Manifest& manifest) {
auto tmp_manifest_file_path = GetManifestTmpFilePath(path, manifest.version());
auto manifest_file_path = GetManifestFilePath(path, manifest.version());

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs->OpenOutputStream(tmp_manifest_file_path));
Manifest::WriteManifestFile(manifest, output.get());
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs.OpenOutputStream(tmp_manifest_file_path));
Manifest::WriteManifestFile(manifest, *output);
RETURN_ARROW_NOT_OK(output->Flush());
RETURN_ARROW_NOT_OK(output->Close());

RETURN_ARROW_NOT_OK(fs->Move(tmp_manifest_file_path, manifest_file_path));
RETURN_ARROW_NOT_OK(fs.Move(tmp_manifest_file_path, manifest_file_path));
return Status::OK();
}

Expand All @@ -249,7 +247,7 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, const Options
RETURN_ARROW_NOT_OK(fs->CreateDir(GetDeleteDataDir(path)));
RETURN_ARROW_NOT_OK(fs->CreateDir(GetBlobDir(path)));

ASSIGN_OR_RETURN_NOT_OK(auto info_vec, FindAllManifest(fs, path));
ASSIGN_OR_RETURN_NOT_OK(auto info_vec, FindAllManifest(*fs, path));
if (info_vec.empty()) {
// create the first manifest
if (options.schema == nullptr) {
Expand All @@ -258,7 +256,7 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, const Options

RETURN_NOT_OK(options.schema->Validate());
manifest = std::make_shared<Manifest>(options.schema);
RETURN_NOT_OK(SafeSaveManifest(fs, path, manifest.get()));
RETURN_NOT_OK(SafeSaveManifest(*fs, path, *manifest));
} else {
arrow::fs::FileInfo file_info;
auto max_cmp = [](arrow::fs::FileInfo& f1, arrow::fs::FileInfo& f2) {
Expand Down Expand Up @@ -293,13 +291,12 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, const Options
return space;
}

Result<arrow::fs::FileInfoVector> Space::FindAllManifest(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path) {
Result<arrow::fs::FileInfoVector> Space::FindAllManifest(arrow::fs::FileSystem& fs, const std::string& path) {
arrow::fs::FileSelector selector;
selector.allow_not_found = true;
selector.base_dir = GetManifestDir(path);

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto files, fs->GetFileInfo(selector));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto files, fs.GetFileInfo(selector));
std::vector<arrow::fs::FileInfo> info_vec;
std::copy_if(files.begin(), files.end(), std::back_inserter(info_vec),
[](arrow::fs::FileInfo& f) { return ParseVersionFromFileName(f.base_name()) != -1; });
Expand Down

0 comments on commit ec1bbbf

Please sign in to comment.