Skip to content

Commit

Permalink
Add Space::Open (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jul 26, 2023
1 parent 057d6ee commit ac9653f
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 113 deletions.
2 changes: 1 addition & 1 deletion cpp/src/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace milvus_storage {
const int kReadBatchSize = 1024;

const std::string kManifestTempFileSuffix = ".manifest.tmp";
const std::string kManifestFileSuffix = "manifest";
const std::string kManifestFileSuffix = ".manifest";
const std::string kManifestsDir = "versions";
const std::string kParquetDataFileSuffix = ".parquet";
const std::string kOffsetFieldName = "__offset";
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Status {

static Status InternalStateError(const std::string& msg) { return Status(kInternalStateError, msg); }

static Status ManifestNotFound(const std::string& msg = "") { return Status(kManifestNotFound, msg); }

bool ok() const { return code_ == kOk; }

bool IsArrowError() const { return code_ == kArrowError; }
Expand All @@ -27,6 +29,8 @@ class Status {

bool IsInternalStateError() const { return code_ == kInternalStateError; }

bool IsManifestNotFound() const { return code_ == kManifestNotFound; }

std::string ToString() const;

private:
Expand All @@ -35,11 +39,12 @@ class Status {
kArrowError = 1,
kInvalidArgument = 2,
kInternalStateError = 3,
kManifestNotFound = 4,
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}

Code code_;
std::string msg_;
};
} // namespace milvus_storage
} // namespace milvus_storage
17 changes: 15 additions & 2 deletions cpp/src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "constants.h"
#include "macro.h"
#include "arrow/filesystem/path_util.h"
#include <cstdlib>
namespace milvus_storage {

Result<schema_proto::LogicType> ToProtobufType(arrow::Type::type type) {
Expand Down Expand Up @@ -228,12 +229,21 @@ std::string GetNewParquetFilePath(const std::string& path) {

std::string GetManifestFilePath(const std::string& path, const int64_t version) {
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string>{path, kManifestsDir, std::to_string(version) + kManifestFileSuffix});
std::vector<std::string_view>{path, kManifestsDir, std::to_string(version) + kManifestFileSuffix});
}

std::string GetManifestTmpFilePath(const std::string& path, const int64_t version) {
return arrow::fs::internal::JoinAbstractPath(
std::vector<std::string>{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix});
std::vector<std::string_view>{path, kManifestsDir, std::to_string(version) + kManifestTempFileSuffix});
}

int64_t ParseVersionFromFileName(const std::string& path) {
auto pos = path.find(kManifestFileSuffix);
if (pos == std::string::npos || !path.ends_with(kManifestFileSuffix)) {
return -1;
}
auto version = path.substr(0, pos);
return std::atol(version.c_str());
}

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
Expand All @@ -251,4 +261,7 @@ Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Sche
return projection_schema;
}

std::string GetManifestDir(const std::string& path) {
return arrow::fs::internal::JoinAbstractPath(std::vector<std::string_view>{path, kManifestsDir});
}
} // namespace milvus_storage
4 changes: 4 additions & 0 deletions cpp/src/common/utils.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <arrow/filesystem/filesystem.h>
#include "arrow/type.h"
#include <memory>
#include <string>
Expand All @@ -20,4 +21,7 @@ std::string GetManifestTmpFilePath(const std::string& path, int64_t version);
Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns);

int64_t ParseVersionFromFileName(const std::string& path);

std::string GetManifestDir(const std::string& path);
} // namespace milvus_storage
29 changes: 21 additions & 8 deletions cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#include "storage/manifest.h"
#include <algorithm>
#include <memory>
#include "arrow/filesystem/filesystem.h"

namespace milvus_storage {

Manifest::Manifest(std::shared_ptr<Options> options, std::shared_ptr<Schema> schema)
: options_(std::move(options)), schema_(std::move(schema)) {}
Manifest::Manifest(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}

const std::shared_ptr<Schema> Manifest::schema() { return schema_; }

Expand All @@ -24,12 +25,9 @@ int64_t Manifest::version() const { return version_; }

void Manifest::set_version(int64_t version) { version_ = version; }

const std::shared_ptr<Options> Manifest::space_options() const { return options_; }

Result<manifest_proto::Manifest> Manifest::ToProtobuf() const {
manifest_proto::Manifest manifest;
manifest.set_version(version_);
manifest.set_allocated_options(options_->ToProtobuf().release());
for (auto& fragment : vector_fragments_) {
manifest.mutable_vector_fragments()->AddAllocated(fragment.ToProtobuf().release());
}
Expand All @@ -46,9 +44,6 @@ Result<manifest_proto::Manifest> Manifest::ToProtobuf() const {
}

void Manifest::FromProtobuf(const manifest_proto::Manifest& manifest) {
options_ = std::make_shared<Options>();
options_->FromProtobuf(manifest.options());

schema_ = std::make_shared<Schema>();
schema_->FromProtobuf(manifest.schema());

Expand All @@ -75,4 +70,22 @@ Status Manifest::WriteManifestFile(const Manifest* manifest, arrow::io::OutputSt
delete[] buffer;
return Status::OK();
}

Result<std::shared_ptr<Manifest>> Manifest::ParseFromFile(std::shared_ptr<arrow::io::InputStream> istream,
arrow::fs::FileInfo& file_info) {
auto size = file_info.size();
char* buffer = new char[size];
auto res = istream->Read(size, buffer);
if (!res.ok()) {
delete[] buffer;
return Status::ArrowError(res.status().ToString());
}

manifest_proto::Manifest proto_manifest;
proto_manifest.ParseFromArray(buffer, size);
auto manifest = std::make_shared<Manifest>();
manifest->FromProtobuf(proto_manifest);
delete[] buffer;
return manifest;
}
} // namespace milvus_storage
9 changes: 5 additions & 4 deletions cpp/src/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

#include "storage/schema.h"
#include "file/fragment.h"
#include "arrow/filesystem/filesystem.h"
namespace milvus_storage {

class Manifest {
public:
Manifest() = default;
explicit Manifest(std::shared_ptr<Options> options, std::shared_ptr<Schema> schema);
explicit Manifest(std::shared_ptr<Schema> schema);

const std::shared_ptr<Schema> schema();

Expand All @@ -27,16 +28,16 @@ class Manifest {

void set_version(int64_t version);

const std::shared_ptr<Options> space_options() const;

Result<manifest_proto::Manifest> ToProtobuf() const;

void FromProtobuf(const manifest_proto::Manifest& manifest);

static Status WriteManifestFile(const Manifest* manifest, arrow::io::OutputStream* output);

static Result<std::shared_ptr<Manifest>> ParseFromFile(std::shared_ptr<arrow::io::InputStream> istream,
arrow::fs::FileInfo& file_info);

private:
std::shared_ptr<Options> options_;
std::shared_ptr<Schema> schema_;
FragmentVector scalar_fragments_;
FragmentVector vector_fragments_;
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/storage/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@

namespace milvus_storage {

std::unique_ptr<manifest_proto::Options> Options::ToProtobuf() {
auto options = std::make_unique<manifest_proto::Options>();
options->set_uri(uri);
return options;
}

void Options::FromProtobuf(const manifest_proto::Options& options) { uri = options.uri(); }

Status SchemaOptions::Validate(const arrow::Schema* schema) {
if (!primary_column.empty()) {
auto primary_field = schema->GetFieldByName(primary_column);
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/storage/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@

namespace milvus_storage {

class Schema;
struct Options {
std::string uri;

bool operator==(const Options& other) const { return uri == other.uri; }

std::unique_ptr<manifest_proto::Options> ToProtobuf();

void FromProtobuf(const manifest_proto::Options& options);
std::shared_ptr<Schema> schema = nullptr;
int64_t version = -1;
};

struct WriteOption {
Expand Down
Loading

0 comments on commit ac9653f

Please sign in to comment.