Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob interfaces in space #37

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const std::string kManifestsDir = "versions";
const std::string kScalarDataDir = "scalar";
const std::string kVectorDataDir = "vector";
const std::string kDeleteDataDir = "delete";
const std::string kBlobDir = "blobs";
const std::string kParquetDataFileSuffix = ".parquet";
const std::string kOffsetFieldName = "__offset";

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ std::string Status::ToString() const {
case kInternalStateError:
res = "InternalStateError: ";
break;
case kFileNotFound:
res = "FileNotFound: ";
default:
std::sprintf(tmp, "Unknown code(%d): ", code_);
res = tmp;
Expand All @@ -35,4 +37,4 @@ std::string Status::ToString() const {
return res;
}

} // namespace milvus_storage
} // namespace milvus_storage
6 changes: 3 additions & 3 deletions cpp/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Status {

static Status InternalStateError(const std::string& msg) { return Status(kInternalStateError, msg); }

static Status ManifestNotFound(const std::string& msg = "") { return Status(kManifestNotFound, msg); }
static Status FileNotFound(const std::string& msg = "") { return Status(kFileNotFound, msg); }

bool ok() const { return code_ == kOk; }

Expand All @@ -29,7 +29,7 @@ class Status {

bool IsInternalStateError() const { return code_ == kInternalStateError; }

bool IsManifestNotFound() const { return code_ == kManifestNotFound; }
bool IsManifestNotFound() const { return code_ == kFileNotFound; }

std::string ToString() const;

Expand All @@ -39,7 +39,7 @@ class Status {
kArrowError = 1,
kInvalidArgument = 2,
kInternalStateError = 3,
kManifestNotFound = 4,
kFileNotFound = 4,
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ std::string GetManifestTmpFilePath(const std::string& path, const int64_t versio
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string_view>{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix});
}
std::string GetBolbDir(const std::string& path) {
return arrow::fs::internal::JoinAbstractPath(std::vector<std::string_view>{path, kBlobDir});
}

std::string GetNewBlobFilePath(const std::string& path) {
auto scalar_file_id = boost::uuids::random_generator()();
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string_view>{path, kBlobDir, boost::uuids::to_string(scalar_file_id)});
}

int64_t ParseVersionFromFileName(const std::string& path) {
auto pos = path.find(kManifestFileSuffix);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ std::string GetManifestDir(const std::string& path);
std::string GetScalarDataDir(const std::string& path);
std::string GetVectorDataDir(const std::string& path);
std::string GetDeleteDataDir(const std::string& path);
std::string GetBolbDir(const std::string& path);

std::string GetNewBlobFilePath(const std::string& path);
} // namespace milvus_storage
19 changes: 19 additions & 0 deletions cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,31 @@ void Manifest::add_vector_fragment(Fragment&& fragment) { vector_fragments_.push

void Manifest::add_delete_fragment(Fragment&& fragment) { delete_fragments_.push_back(fragment); }

void Manifest::add_blob(Blob&& blob) { blobs_.emplace_back(blob); }

const FragmentVector& Manifest::scalar_fragments() const { return scalar_fragments_; }

const FragmentVector& Manifest::vector_fragments() const { return vector_fragments_; }

const FragmentVector& Manifest::delete_fragments() const { return delete_fragments_; }

bool Manifest::has_blob(std::string& name) {
auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; });
return iter != blobs_.end();
}

void Manifest::remove_blob_if_exist(std::string& name) {
std::remove_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; });
}

Result<Blob> Manifest::get_blob(std::string& name) {
auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; });
if (iter == blobs_.end()) {
return Status::FileNotFound("blob not found");
}
return *iter;
}

int64_t Manifest::version() const { return version_; }

void Manifest::set_version(int64_t version) { version_ = version; }
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ class Manifest {

void add_delete_fragment(Fragment&& fragment);

void add_blob(Blob&& blob);

const FragmentVector& scalar_fragments() const;

const FragmentVector& vector_fragments() const;

const FragmentVector& delete_fragments() const;

bool has_blob(std::string& name);

void remove_blob_if_exist(std::string& name);

Result<Blob> get_blob(std::string& name);

int64_t version() const;

void set_version(int64_t version);
Expand Down
37 changes: 33 additions & 4 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/type_fwd.h>
#include <arrow/status.h>
#include <algorithm>
#include <atomic>
#include <cstdint>
Expand Down Expand Up @@ -174,11 +175,39 @@ std::unique_ptr<arrow::RecordBatchReader> Space::Read(std::shared_ptr<ReadOption
return RecordReader::MakeRecordReader(manifest_, manifest_->schema(), fs_, delete_fragments_, option);
}

Status Space::WriteBolb(std::string name, char* blob, int64_t length, bool replace) {}
Status Space::WriteBolb(std::string name, void* blob, int64_t length, bool replace) {
if (!replace && manifest_->has_blob(name)) {
return Status::InvalidArgument("blob already exist");
}

std::string blob_file_path = GetNewBlobFilePath(path_);
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(blob_file_path));
RETURN_ARROW_NOT_OK(output->Write(blob, length));
RETURN_ARROW_NOT_OK(output->Close());

std::lock_guard<std::mutex> lock(mutex_);
auto next_version = next_manifest_version_++;
auto copied = new Manifest(*manifest_);
copied->remove_blob_if_exist(name);
copied->add_blob({name, length, blob_file_path});
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
manifest_.reset(copied);
return Status::OK();
}

Status Space::ReadBlob(std::string name, char* target) {}
Status Space::ReadBlob(std::string name, void* target) {
auto manifest = manifest_;
ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs_->OpenInputFile(blob.file));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto _, file->Read(blob.size, target));
return Status::OK();
}

Result<int64_t> Space::GetBlobByteSize(std::string name) {}
Result<int64_t> Space::GetBlobByteSize(std::string name) {
auto manifest = manifest_;
ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name));
return blob.size;
}

Status Space::SafeSaveManifest(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path,
Expand Down Expand Up @@ -233,7 +262,7 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, Options optio
return ParseVersionFromFileName(f.base_name()) == options.version;
});
if (iter == info_vec.end()) {
return Status::ManifestNotFound();
return Status::FileNotFound();
}
file_info = *iter;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class Space {
static Result<std::unique_ptr<Space>> Open(const std::string& uri, Options options);

// Write a blob to space. Will return a error if replace is false and a blob with the same name exists.
Status WriteBolb(std::string name, char* blob, int64_t length, bool replace = false);
Status WriteBolb(std::string name, void* blob, int64_t length, bool replace = false);

// Read a blob from space, the target must have enough size to hold this blob.
Status ReadBlob(std::string name, char* target);
Status ReadBlob(std::string name, void* target);

// Get the byte size of a blob.
Result<int64_t> GetBlobByteSize(std::string name);
Expand Down