Skip to content

Commit

Permalink
feat(space): refactor and improve Storage Space functionality (#24)
Browse files Browse the repository at this point in the history
- add unit test for space by cpp.
- add conversion function for uri to path.
- add offset field when building scalar schema.
- change schema test for the new offset column.
  • Loading branch information
loloxwg committed Jul 24, 2023
1 parent 6f433ba commit b940932
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 132 deletions.
16 changes: 15 additions & 1 deletion cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
auto schema = uri_parser.scheme();
if (schema == "file") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::LocalFileSystemOptions::FromUri(uri_parser, nullptr));
auto output_path = uri_parser.path();
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::LocalFileSystemOptions::FromUri(uri_parser, &output_path));
return std::shared_ptr<arrow::fs::FileSystem>(new arrow::fs::LocalFileSystem(option));
}

Expand All @@ -29,4 +30,17 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string

return Status::InvalidArgument("Unsupported schema: " + schema);
}
/**
* Uri Convert to Path
*/
std::string UriToPath(const std::string& uri) {
arrow::internal::Uri uri_parser;
auto status = uri_parser.Parse(uri);

if (status.ok()) {
return uri_parser.path();
} else {
return std::string("");
}
}
}; // namespace milvus_storage
4 changes: 3 additions & 1 deletion cpp/src/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ namespace milvus_storage {

Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri);

}
std::string UriToPath(const std::string& uri);

} // namespace milvus_storage
6 changes: 3 additions & 3 deletions cpp/src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ Result<std::shared_ptr<arrow::Schema>> FromProtobufSchema(const schema_proto::Ar
return res;
}

std::string GetNewParquetFilePath(std::string& path) {
std::string GetNewParquetFilePath(const std::string& path) {
auto scalar_file_id = boost::uuids::random_generator()();
return path + boost::uuids::to_string(scalar_file_id) + kParquetDataFileSuffix;
}

std::string GetManifestFilePath(std::string& path) { return path + kManifestFileName; }
std::string GetManifestFilePath(const std::string& path) { return path + kManifestFileName; }

std::string GetManifestTmpFilePath(std::string& path) { return path + kManifestTempFileName; }
std::string GetManifestTmpFilePath(const std::string& path) { return path + kManifestTempFileName; }

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns) {
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ Result<std::unique_ptr<schema_proto::ArrowSchema>> ToProtobufSchema(const arrow:

Result<std::shared_ptr<arrow::Schema>> FromProtobufSchema(const schema_proto::ArrowSchema& schema);

std::string GetNewParquetFilePath(std::string& path);
std::string GetNewParquetFilePath(const std::string& path);

std::string GetManifestFilePath(std::string& path);
std::string GetManifestFilePath(const std::string& path);

std::string GetManifestTmpFilePath(std::string& path);
std::string GetManifestTmpFilePath(const std::string& path);

Result<std::shared_ptr<arrow::Schema>> ProjectSchema(std::shared_ptr<arrow::Schema> schema,
std::vector<std::string> columns);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/storage/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Status Schema::BuildScalarSchema() {
}
RETURN_ARROW_NOT_OK(scalar_schema_builder.AddField(field));
}
auto offset_field = std::make_shared<arrow::Field>("off_set", arrow::int64());
RETURN_ARROW_NOT_OK(scalar_schema_builder.AddField(offset_field));
ASSIGN_OR_RETURN_ARROW_NOT_OK(scalar_schema_, scalar_schema_builder.Finish());
return Status::OK();
}
Expand Down
30 changes: 16 additions & 14 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,33 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {
}
}

// add offset column
// Only add offset column to scalar columns; vector columns not changed
std::vector<int64_t> offset_values(batch->num_rows());
std::iota(offset_values.begin(), offset_values.end(), 0);
arrow::NumericBuilder<arrow::Int64Type> builder;
auto offset_col = builder.AppendValues(offset_values);
scalar_cols.emplace_back(builder.Finish().ValueOrDie());
RETURN_ARROW_NOT_OK(builder.AppendValues(offset_values));
auto offset_col = builder.Finish().ValueOrDie();
scalar_cols.emplace_back(offset_col);

auto scalar_record = arrow::RecordBatch::Make(scalar_schema, batch->num_rows(), scalar_cols);
auto vector_record = arrow::RecordBatch::Make(vector_schema, batch->num_rows(), vector_cols);

if (!scalar_writer) {
auto scalar_file_path = GetNewParquetFilePath(manifest_->space_options()->uri);
if (scalar_writer == nullptr) {
auto scalar_file_path = GetNewParquetFilePath(UriToPath(manifest_->space_options()->uri));
scalar_writer = new ParquetFileWriter(scalar_schema, fs_, scalar_file_path);
RETURN_NOT_OK(scalar_writer->Init());
scalar_fragment.add_file(scalar_file_path);
}

auto vector_file_path = GetNewParquetFilePath(manifest_->space_options()->uri);
if (vector_writer == nullptr) {
auto vector_file_path = GetNewParquetFilePath(UriToPath(manifest_->space_options()->uri));
vector_writer = new ParquetFileWriter(vector_schema, fs_, vector_file_path);
RETURN_NOT_OK(scalar_writer->Init());

scalar_fragment.add_file(scalar_file_path);
RETURN_NOT_OK(vector_writer->Init());
vector_fragment.add_file(vector_file_path);
}

scalar_writer->Write(scalar_record.get());
vector_writer->Write(vector_record.get());
RETURN_NOT_OK(scalar_writer->Write(scalar_record.get()));
RETURN_NOT_OK(vector_writer->Write(vector_record.get()));

if (scalar_writer->count() >= option->max_record_per_file) {
scalar_writer->Close();
Expand All @@ -104,7 +106,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) {
}
}

if (scalar_writer) {
if (scalar_writer != nullptr) {
scalar_writer->Close();
vector_writer->Close();
scalar_writer = nullptr;
Expand Down Expand Up @@ -173,8 +175,8 @@ std::unique_ptr<arrow::RecordBatchReader> Space::Read(std::shared_ptr<ReadOption
}

Status Space::SafeSaveManifest(const Manifest* manifest) {
auto tmp_manifest_file_path = GetManifestTmpFilePath(manifest->space_options()->uri);
auto manifest_file_path = GetManifestFilePath(manifest->space_options()->uri);
auto tmp_manifest_file_path = GetManifestTmpFilePath(UriToPath(manifest->space_options()->uri));
auto manifest_file_path = GetManifestFilePath(UriToPath(manifest->space_options()->uri));

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto output, fs_->OpenOutputStream(tmp_manifest_file_path));
Manifest::WriteManifestFile(manifest, output.get());
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class RecordReader;

class Space {
public:
Space(std::shared_ptr<Schema> schema, std::shared_ptr<Options>& options);

Status Init();

Status Write(arrow::RecordBatchReader* reader, WriteOption* option);
Expand All @@ -26,8 +28,6 @@ class Space {
static std::unique_ptr<Space> Create();

private:
Space(std::shared_ptr<Schema> schema, std::shared_ptr<Options>& options);

Status SafeSaveManifest(const Manifest* manifest);

std::string base_path_;
Expand Down
32 changes: 14 additions & 18 deletions cpp/test/schema_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ namespace milvus_storage {

TEST(SchemaValidateTest, SchemaValidateNoVersionColTest) {
// Create Fields
std::shared_ptr<arrow::KeyValueMetadata> metadata = arrow::KeyValueMetadata::Make(
std::vector<std::string>{"key1", "key2"}, std::vector<std::string>{"value1", "value2"});

std::shared_ptr<arrow::Field> pk_field = arrow::field("pk_field", arrow::int64(), /*nullable=*/false, metadata);

std::shared_ptr<arrow::Field> vec_field =
arrow::field("vec_field", arrow::fixed_size_binary(10), /*nullable=*/false, metadata);
auto metadata = arrow::KeyValueMetadata::Make(std::vector<std::string>{"key1", "key2"},
std::vector<std::string>{"value1", "value2"});
auto pk_field = arrow::field("pk_field", arrow::int64(), /*nullable=*/false, metadata);
auto vec_field = arrow::field("vec_field", arrow::fixed_size_binary(10), /*nullable=*/false, metadata);

// Create Arrow Schema
arrow::SchemaBuilder schema_builder;
Expand Down Expand Up @@ -44,8 +41,10 @@ TEST(SchemaValidateTest, SchemaValidateNoVersionColTest) {
ASSERT_TRUE(sp_status1.ok());

auto scalar_schema = space_schema1->scalar_schema();
ASSERT_EQ(scalar_schema->num_fields(), 1);
/// scalar schema has no version column but has offset column
ASSERT_EQ(scalar_schema->num_fields(), 2);
ASSERT_EQ(scalar_schema->field(0)->name(), schema_options->primary_column);
ASSERT_EQ(scalar_schema->field(1)->name(), "off_set");

auto vector_schema = space_schema1->vector_schema();
ASSERT_EQ(vector_schema->num_fields(), 2);
Expand All @@ -60,15 +59,11 @@ TEST(SchemaValidateTest, SchemaValidateNoVersionColTest) {

TEST(SchemaValidateTest, SchemaValidateVersionColTest) {
// Create Fields
std::shared_ptr<arrow::KeyValueMetadata> metadata = arrow::KeyValueMetadata::Make(
std::vector<std::string>{"key1", "key2"}, std::vector<std::string>{"value1", "value2"});

std::shared_ptr<arrow::Field> pk_field = arrow::field("pk_field", arrow::int64(), /*nullable=*/false, metadata);

std::shared_ptr<arrow::Field> ts_field = arrow::field("ts_field", arrow::int64(), /*nullable=*/false, metadata);

std::shared_ptr<arrow::Field> vec_field =
arrow::field("vec_field", arrow::fixed_size_binary(10), /*nullable=*/false, metadata);
auto metadata = arrow::KeyValueMetadata::Make(std::vector<std::string>{"key1", "key2"},
std::vector<std::string>{"value1", "value2"});
auto pk_field = arrow::field("pk_field", arrow::int64(), /*nullable=*/false, metadata);
auto ts_field = arrow::field("ts_field", arrow::int64(), /*nullable=*/false, metadata);
auto vec_field = arrow::field("vec_field", arrow::fixed_size_binary(10), /*nullable=*/false, metadata);

// Create Arrow Schema
arrow::SchemaBuilder schema_builder;
Expand Down Expand Up @@ -99,9 +94,10 @@ TEST(SchemaValidateTest, SchemaValidateVersionColTest) {
ASSERT_TRUE(sp_status1.ok());

auto scalar_schema = space_schema1->scalar_schema();
ASSERT_EQ(scalar_schema->num_fields(), 2);
ASSERT_EQ(scalar_schema->num_fields(), 3);
ASSERT_EQ(scalar_schema->field(0)->name(), schema_options->primary_column);
ASSERT_EQ(scalar_schema->field(1)->name(), schema_options->version_column);
ASSERT_EQ(scalar_schema->field(2)->name(), "off_set");

auto vector_schema = space_schema1->vector_schema();
ASSERT_EQ(vector_schema->num_fields(), 3);
Expand Down
Loading

0 comments on commit b940932

Please sign in to comment.