Skip to content

Commit

Permalink
Add blob interfaces in space (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Aug 9, 2023
1 parent b281238 commit 8743cb4
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 10 deletions.
1 change: 1 addition & 0 deletions cpp/src/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const std::string kManifestsDir = "versions";
const std::string kScalarDataDir = "scalar";
const std::string kVectorDataDir = "vector";
const std::string kDeleteDataDir = "delete";
const std::string kBlobDir = "blobs";
const std::string kParquetDataFileSuffix = ".parquet";
const std::string kOffsetFieldName = "__offset";

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ std::string Status::ToString() const {
case kInternalStateError:
res = "InternalStateError: ";
break;
case kFileNotFound:
res = "FileNotFound: ";
default:
std::sprintf(tmp, "Unknown code(%d): ", code_);
res = tmp;
Expand All @@ -35,4 +37,4 @@ std::string Status::ToString() const {
return res;
}

} // namespace milvus_storage
} // namespace milvus_storage
6 changes: 3 additions & 3 deletions cpp/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Status {

static Status InternalStateError(const std::string& msg) { return Status(kInternalStateError, msg); }

static Status ManifestNotFound(const std::string& msg = "") { return Status(kManifestNotFound, msg); }
static Status FileNotFound(const std::string& msg = "") { return Status(kFileNotFound, msg); }

bool ok() const { return code_ == kOk; }

Expand All @@ -29,7 +29,7 @@ class Status {

bool IsInternalStateError() const { return code_ == kInternalStateError; }

bool IsManifestNotFound() const { return code_ == kManifestNotFound; }
bool IsManifestNotFound() const { return code_ == kFileNotFound; }

std::string ToString() const;

Expand All @@ -39,7 +39,7 @@ class Status {
kArrowError = 1,
kInvalidArgument = 2,
kInternalStateError = 3,
kManifestNotFound = 4,
kFileNotFound = 4,
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ std::string GetManifestTmpFilePath(const std::string& path, const int64_t versio
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string_view>{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix});
}
std::string GetBolbDir(const std::string& path) {
return arrow::fs::internal::JoinAbstractPath(std::vector<std::string_view>{path, kBlobDir});
}

std::string GetNewBlobFilePath(const std::string& path) {
auto scalar_file_id = boost::uuids::random_generator()();
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string_view>{path, kBlobDir, boost::uuids::to_string(scalar_file_id)});
}

int64_t ParseVersionFromFileName(const std::string& path) {
auto pos = path.find(kManifestFileSuffix);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ std::string GetManifestDir(const std::string& path);
std::string GetScalarDataDir(const std::string& path);
std::string GetVectorDataDir(const std::string& path);
std::string GetDeleteDataDir(const std::string& path);
std::string GetBolbDir(const std::string& path);

std::string GetNewBlobFilePath(const std::string& path);
} // namespace milvus_storage
19 changes: 19 additions & 0 deletions cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,31 @@ void Manifest::add_vector_fragment(Fragment&& fragment) { vector_fragments_.push

void Manifest::add_delete_fragment(Fragment&& fragment) { delete_fragments_.push_back(fragment); }

void Manifest::add_blob(Blob&& blob) { blobs_.emplace_back(blob); }

const FragmentVector& Manifest::scalar_fragments() const { return scalar_fragments_; }

const FragmentVector& Manifest::vector_fragments() const { return vector_fragments_; }

const FragmentVector& Manifest::delete_fragments() const { return delete_fragments_; }

bool Manifest::has_blob(std::string& name) {
auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; });
return iter != blobs_.end();
}

void Manifest::remove_blob_if_exist(std::string& name) {
std::remove_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; });
}

Result<Blob> Manifest::get_blob(std::string& name) {
auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; });
if (iter == blobs_.end()) {
return Status::FileNotFound("blob not found");
}
return *iter;
}

int64_t Manifest::version() const { return version_; }

void Manifest::set_version(int64_t version) { version_ = version; }
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ class Manifest {

void add_delete_fragment(Fragment&& fragment);

void add_blob(Blob&& blob);

const FragmentVector& scalar_fragments() const;

const FragmentVector& vector_fragments() const;

const FragmentVector& delete_fragments() const;

bool has_blob(std::string& name);

void remove_blob_if_exist(std::string& name);

Result<Blob> get_blob(std::string& name);

int64_t version() const;

void set_version(int64_t version);
Expand Down
37 changes: 33 additions & 4 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/type_fwd.h>
#include <arrow/status.h>
#include <algorithm>
#include <atomic>
#include <cstdint>
Expand Down Expand Up @@ -174,11 +175,39 @@ std::unique_ptr<arrow::RecordBatchReader> Space::Read(std::shared_ptr<ReadOption
return RecordReader::MakeRecordReader(manifest_, manifest_->schema(), fs_, delete_fragments_, option);
}

Status Space::WriteBolb(std::string name, char* blob, int64_t length, bool replace) {}
Status Space::WriteBolb(std::string name, void* blob, int64_t length, bool replace) {
if (!replace && manifest_->has_blob(name)) {
return Status::InvalidArgument("blob already exist");
}

std::string blob_file_path = GetNewBlobFilePath(path_);
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(blob_file_path));
RETURN_ARROW_NOT_OK(output->Write(blob, length));
RETURN_ARROW_NOT_OK(output->Close());

std::lock_guard<std::mutex> lock(mutex_);
auto next_version = next_manifest_version_++;
auto copied = new Manifest(*manifest_);
copied->remove_blob_if_exist(name);
copied->add_blob({name, length, blob_file_path});
RETURN_NOT_OK(SafeSaveManifest(fs_, path_, copied));
manifest_.reset(copied);
return Status::OK();
}

Status Space::ReadBlob(std::string name, char* target) {}
Status Space::ReadBlob(std::string name, void* target) {
auto manifest = manifest_;
ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs_->OpenInputFile(blob.file));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto _, file->Read(blob.size, target));
return Status::OK();
}

Result<int64_t> Space::GetBlobByteSize(std::string name) {}
Result<int64_t> Space::GetBlobByteSize(std::string name) {
auto manifest = manifest_;
ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name));
return blob.size;
}

Status Space::SafeSaveManifest(std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& path,
Expand Down Expand Up @@ -233,7 +262,7 @@ Result<std::unique_ptr<Space>> Space::Open(const std::string& uri, Options optio
return ParseVersionFromFileName(f.base_name()) == options.version;
});
if (iter == info_vec.end()) {
return Status::ManifestNotFound();
return Status::FileNotFound();
}
file_info = *iter;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class Space {
static Result<std::unique_ptr<Space>> Open(const std::string& uri, Options options);

// Write a blob to space. Will return a error if replace is false and a blob with the same name exists.
Status WriteBolb(std::string name, char* blob, int64_t length, bool replace = false);
Status WriteBolb(std::string name, void* blob, int64_t length, bool replace = false);

// Read a blob from space, the target must have enough size to hold this blob.
Status ReadBlob(std::string name, char* target);
Status ReadBlob(std::string name, void* target);

// Get the byte size of a blob.
Result<int64_t> GetBlobByteSize(std::string name);
Expand Down

0 comments on commit 8743cb4

Please sign in to comment.