Skip to content

Commit

Permalink
[Cpp]: rewrite interfaces (#105)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 3, 2024
1 parent 75720c3 commit 1f62f9c
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 71 deletions.
21 changes: 10 additions & 11 deletions cpp/include/milvus-storage/reader/common/combine_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -16,32 +16,31 @@

#include <memory>
#include "arrow/record_batch.h"
#include "common/result.h"
#include "storage/schema.h"

namespace milvus_storage {

// CombineReader merges scalar fields and vector fields to an entire record.
class CombineReader : public arrow::RecordBatchReader {
public:
static Result<std::shared_ptr<CombineReader>> Make(std::shared_ptr<arrow::RecordBatchReader> scalar_reader,
std::shared_ptr<arrow::RecordBatchReader> vector_reader,
std::shared_ptr<Schema> schema);
static std::unique_ptr<CombineReader> Make(std::unique_ptr<arrow::RecordBatchReader> scalar_reader,
std::unique_ptr<arrow::RecordBatchReader> vector_reader,
std::shared_ptr<Schema> schema);

std::shared_ptr<arrow::Schema> schema() const override;

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

CombineReader(std::shared_ptr<arrow::RecordBatchReader> scalar_reader,
std::shared_ptr<arrow::RecordBatchReader> vector_reader,
CombineReader(std::unique_ptr<arrow::RecordBatchReader> scalar_reader,
std::unique_ptr<arrow::RecordBatchReader> vector_reader,
std::shared_ptr<Schema> schema)
: scalar_reader_(std::move(scalar_reader)),
vector_reader_(std::move(vector_reader)),
schema_(std::move(schema)) {}

private:
std::shared_ptr<arrow::RecordBatchReader> scalar_reader_;
std::shared_ptr<arrow::RecordBatchReader> vector_reader_;
std::unique_ptr<arrow::RecordBatchReader> scalar_reader_;
std::unique_ptr<arrow::RecordBatchReader> vector_reader_;
std::shared_ptr<Schema> schema_;
};
} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/reader/merge_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class MergeRecordReader : public arrow::RecordBatchReader {
std::shared_ptr<Schema> schema_;
const ReadOptions options_;

std::shared_ptr<arrow::RecordBatchReader> scalar_reader_;
std::shared_ptr<arrow::RecordBatchReader> vector_reader_;
std::unique_ptr<arrow::RecordBatchReader> scalar_reader_;
std::unique_ptr<arrow::RecordBatchReader> vector_reader_;
std::shared_ptr<arrow::RecordBatchReader> curr_reader_;
const DeleteFragmentVector delete_fragments_;
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/reader/record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ bool only_contain_vector_columns(std::shared_ptr<Schema> schema, const std::set<

bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema, const Filter::FilterSet& filters);

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeScanDataReader(std::shared_ptr<Manifest> manifest,
std::unique_ptr<arrow::RecordBatchReader> MakeScanDataReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);

std::shared_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::unique_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
} // namespace internal
} // namespace milvus_storage
8 changes: 4 additions & 4 deletions cpp/include/milvus-storage/storage/manifest.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -25,7 +25,7 @@ class Manifest {
Manifest() = default;
explicit Manifest(std::shared_ptr<Schema> schema);

const std::shared_ptr<Schema> schema();
std::shared_ptr<Schema> schema();

void add_scalar_fragment(Fragment&& fragment);

Expand Down
10 changes: 5 additions & 5 deletions cpp/include/milvus-storage/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ class RecordReader;

class Space {
public:
Status Write(arrow::RecordBatchReader* reader, const WriteOption& option);
Status Write(arrow::RecordBatchReader& reader, const WriteOption& option);

std::unique_ptr<arrow::RecordBatchReader> Read(const ReadOptions& option) const;

// Scan delete files
Result<std::shared_ptr<arrow::RecordBatchReader>> ScanDelete() const;
std::unique_ptr<arrow::RecordBatchReader> ScanDelete() const;

// Scan data files without filtering deleted data
Result<std::shared_ptr<arrow::RecordBatchReader>> ScanData() const;
std::unique_ptr<arrow::RecordBatchReader> ScanData() const;

Status Delete(arrow::RecordBatchReader* reader);
Status Delete(arrow::RecordBatchReader& reader);

// Open opened a space or create if the space does not exist.
// If space does not exist. schema should not be nullptr, or an error will be returned.
Expand All @@ -54,7 +54,7 @@ class Space {
// Get the byte size of a blob.
Result<int64_t> GetBlobByteSize(const std::string& name) const;

std::vector<Blob> StatisticsBlobs() const;
const std::vector<Blob>& StatisticsBlobs() const;

std::shared_ptr<Schema> schema() const;

Expand Down
23 changes: 11 additions & 12 deletions cpp/src/reader/common/combine_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "reader/common/combine_reader.h"
#include "common/macro.h"
#include <memory>
#include "arrow/type.h"
namespace milvus_storage {
Result<std::shared_ptr<CombineReader>> CombineReader::Make(std::shared_ptr<arrow::RecordBatchReader> scalar_reader,
std::shared_ptr<arrow::RecordBatchReader> vector_reader,
std::shared_ptr<Schema> schema) {
if (scalar_reader == nullptr || vector_reader == nullptr) {
return Status::InvalidArgument("null reader");
}
return std::make_shared<CombineReader>(scalar_reader, vector_reader, schema);
std::unique_ptr<CombineReader> CombineReader::Make(std::unique_ptr<arrow::RecordBatchReader> scalar_reader,
std::unique_ptr<arrow::RecordBatchReader> vector_reader,
std::shared_ptr<Schema> schema) {
assert(scalar_reader != nullptr && vector_reader != nullptr);
return std::make_unique<CombineReader>(std::move(scalar_reader), std::move(vector_reader), schema);
}

std::shared_ptr<arrow::Schema> CombineReader::schema() const { return schema_->schema(); }
Expand All @@ -48,4 +46,5 @@ arrow::Status CombineReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* batch
*batch = arrow::RecordBatch::Make(schema(), scalar_batch->num_rows(), std::move(columns));
return arrow::Status::OK();
}
} // namespace milvus_storage

} // namespace milvus_storage
14 changes: 7 additions & 7 deletions cpp/src/reader/merge_record_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -36,8 +36,8 @@ MergeRecordReader::MergeRecordReader(const ReadOptions& options,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::shared_ptr<Schema> schema)
: schema_(schema), fs_(fs), options_(options), delete_fragments_(delete_fragments) {
scalar_reader_ = std::make_shared<MultiFilesSequentialReader>(fs, scalar_fragments, schema->scalar_schema(), options);
vector_reader_ = std::make_shared<MultiFilesSequentialReader>(fs, vector_fragments, schema->vector_schema(), options);
scalar_reader_ = std::make_unique<MultiFilesSequentialReader>(fs, scalar_fragments, schema->scalar_schema(), options);
vector_reader_ = std::make_unique<MultiFilesSequentialReader>(fs, vector_fragments, schema->vector_schema(), options);
}

std::shared_ptr<arrow::Schema> MergeRecordReader::schema() const {
Expand Down Expand Up @@ -73,8 +73,8 @@ arrow::Status MergeRecordReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* b
}

Result<std::shared_ptr<arrow::RecordBatchReader>> MergeRecordReader::MakeInnerReader() {
ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader_, vector_reader_, schema_));
auto delete_reader = DeleteMergeReader::Make(combine_reader, schema_->options(), delete_fragments_, options_);
auto combine_reader = CombineReader::Make(std::move(scalar_reader_), std::move(vector_reader_), schema_);
auto delete_reader = DeleteMergeReader::Make(std::move(combine_reader), schema_->options(), delete_fragments_, options_);
ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema(), delete_reader, options_));
return res;
}
Expand Down
29 changes: 12 additions & 17 deletions cpp/src/reader/record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ std::unique_ptr<arrow::RecordBatchReader> MakeRecordReader(std::shared_ptr<Manif
return std::make_unique<FilterQueryRecordReader>(options, scalar_data, vector_data, delete_fragments, fs, schema);
}

bool only_contain_scalar_columns(const std::shared_ptr<Schema> schema,
const std::set<std::string>& related_columns) {
bool only_contain_scalar_columns(const std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns) {
for (auto& column : related_columns) {
if (schema->options()->vector_column == column) {
return false;
Expand All @@ -82,8 +81,7 @@ bool only_contain_scalar_columns(const std::shared_ptr<Schema> schema,
return true;
}

bool only_contain_vector_columns(const std::shared_ptr<Schema> schema,
const std::set<std::string>& related_columns) {
bool only_contain_vector_columns(const std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns) {
for (auto& column : related_columns) {
if (schema->options()->vector_column != column && schema->options()->primary_column != column &&
schema->options()->version_column != column) {
Expand All @@ -93,8 +91,7 @@ bool only_contain_vector_columns(const std::shared_ptr<Schema> schema,
return true;
}

bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema,
const Filter::FilterSet& filters) {
bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema, const Filter::FilterSet& filters) {
for (auto& filter : filters) {
if (filter->get_column_name() != schema->options()->primary_column &&
filter->get_column_name() != schema->options()->version_column) {
Expand All @@ -104,22 +101,20 @@ bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema,
return true;
}

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeScanDataReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs) {
auto scalar_reader = std::make_shared<MultiFilesSequentialReader>(fs, manifest->scalar_fragments(),
std::unique_ptr<arrow::RecordBatchReader> MakeScanDataReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs) {
auto scalar_reader = std::make_unique<MultiFilesSequentialReader>(fs, manifest->scalar_fragments(),
manifest->schema()->scalar_schema(), ReadOptions());
auto vector_reader = std::make_shared<MultiFilesSequentialReader>(fs, manifest->vector_fragments(),
auto vector_reader = std::make_unique<MultiFilesSequentialReader>(fs, manifest->vector_fragments(),
manifest->schema()->vector_schema(), ReadOptions());

ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader, vector_reader, manifest->schema()));
return std::static_pointer_cast<arrow::RecordBatchReader>(combine_reader);
return CombineReader::Make(std::move(scalar_reader), std::move(vector_reader), manifest->schema());
}

std::shared_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs) {
auto reader = std::make_shared<MultiFilesSequentialReader>(fs, manifest->delete_fragments(),
manifest->schema()->delete_schema(), ReadOptions());
return reader;
std::unique_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs) {
return std::make_unique<MultiFilesSequentialReader>(fs, manifest->delete_fragments(),
manifest->schema()->delete_schema(), ReadOptions());
}
} // namespace internal
} // namespace milvus_storage
2 changes: 1 addition & 1 deletion cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace milvus_storage {

Manifest::Manifest(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}

const std::shared_ptr<Schema> Manifest::schema() { return schema_; }
std::shared_ptr<Schema> Manifest::schema() { return schema_; }

void Manifest::add_scalar_fragment(Fragment&& fragment) { scalar_fragments_.push_back(fragment); }

Expand Down
17 changes: 8 additions & 9 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "common/log.h"
#include "common/macro.h"
#include "file/delete_fragment.h"
#include "filter/constant_filter.h"
#include "format/parquet/file_writer.h"
#include "storage/space.h"
#include "common/utils.h"
Expand All @@ -47,8 +46,8 @@ Status Space::Init() {
return Status::OK();
}

Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option) {
if (!reader->schema()->Equals(*this->manifest_->schema()->schema())) {
Status Space::Write(arrow::RecordBatchReader& reader, const WriteOption& option) {
if (!reader.schema()->Equals(*this->manifest_->schema()->schema())) {
return Status::InvalidArgument("Schema not match");
}

Expand All @@ -65,7 +64,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option)
Fragment scalar_fragment;
Fragment vector_fragment;

for (auto rec = reader->Next(); rec.ok(); rec = reader->Next()) {
for (auto rec = reader.Next(); rec.ok(); rec = reader.Next()) {
auto batch = rec.ValueOrDie();
if (batch == nullptr) {
break;
Expand Down Expand Up @@ -140,12 +139,12 @@ Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option)
return Status::OK();
}

Status Space::Delete(arrow::RecordBatchReader* reader) {
Status Space::Delete(arrow::RecordBatchReader& reader) {
FileWriter* writer = nullptr;
Fragment fragment;
auto delete_fragment = std::make_shared<DeleteFragment>(fs_, manifest_->schema());
std::string delete_file;
for (auto rec = reader->Next(); rec.ok(); rec = reader->Next()) {
for (auto rec = reader.Next(); rec.ok(); rec = reader.Next()) {
auto batch = rec.ValueOrDie();
if (batch == nullptr) {
break;
Expand Down Expand Up @@ -307,13 +306,13 @@ Result<arrow::fs::FileInfoVector> Space::FindAllManifest(std::shared_ptr<arrow::
return info_vec;
}

std::vector<Blob> Space::StatisticsBlobs() const { return manifest_->blobs(); }
const std::vector<Blob>& Space::StatisticsBlobs() const { return manifest_->blobs(); }

Result<std::shared_ptr<arrow::RecordBatchReader>> Space::ScanDelete() const {
std::unique_ptr<arrow::RecordBatchReader> Space::ScanDelete() const {
return internal::MakeScanDeleteReader(manifest_, fs_);
}

Result<std::shared_ptr<arrow::RecordBatchReader>> Space::ScanData() const {
std::unique_ptr<arrow::RecordBatchReader> Space::ScanData() const {
return internal::MakeScanDataReader(manifest_, fs_);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/test/space_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TEST(SpaceTest, SpaceWriteReadTest) {
auto reader = arrow::RecordBatchReader::Make({rec_batch}, arrow_schema).ValueOrDie();

WriteOption write_option{10};
space->Write(reader.get(), write_option);
space->Write(*reader, write_option);

ConstantFilter filter(EQUAL, "pk_field", Value::Int64(1));
ReadOptions read_options;
Expand Down

0 comments on commit 1f62f9c

Please sign in to comment.