diff --git a/cpp/include/milvus-storage/reader/common/combine_reader.h b/cpp/include/milvus-storage/reader/common/combine_reader.h index 72e44597..ba4fd168 100644 --- a/cpp/include/milvus-storage/reader/common/combine_reader.h +++ b/cpp/include/milvus-storage/reader/common/combine_reader.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,6 @@ #include #include "arrow/record_batch.h" -#include "common/result.h" #include "storage/schema.h" namespace milvus_storage { @@ -24,24 +23,24 @@ namespace milvus_storage { // CombineReader merges scalar fields and vector fields to an entire record. class CombineReader : public arrow::RecordBatchReader { public: - static Result> Make(std::shared_ptr scalar_reader, - std::shared_ptr vector_reader, - std::shared_ptr schema); + static std::unique_ptr Make(std::unique_ptr scalar_reader, + std::unique_ptr vector_reader, + std::shared_ptr schema); std::shared_ptr schema() const override; arrow::Status ReadNext(std::shared_ptr* batch) override; - CombineReader(std::shared_ptr scalar_reader, - std::shared_ptr vector_reader, + CombineReader(std::unique_ptr scalar_reader, + std::unique_ptr vector_reader, std::shared_ptr schema) : scalar_reader_(std::move(scalar_reader)), vector_reader_(std::move(vector_reader)), schema_(std::move(schema)) {} private: - std::shared_ptr scalar_reader_; - std::shared_ptr vector_reader_; + std::unique_ptr scalar_reader_; + std::unique_ptr vector_reader_; std::shared_ptr schema_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/reader/merge_record_reader.h b/cpp/include/milvus-storage/reader/merge_record_reader.h index 045e5fcd..b370d7c2 100644 --- a/cpp/include/milvus-storage/reader/merge_record_reader.h +++ b/cpp/include/milvus-storage/reader/merge_record_reader.h @@ -44,8 +44,8 @@ class MergeRecordReader : public arrow::RecordBatchReader { std::shared_ptr schema_; const ReadOptions options_; - std::shared_ptr scalar_reader_; - std::shared_ptr vector_reader_; + std::unique_ptr scalar_reader_; + std::unique_ptr vector_reader_; std::shared_ptr curr_reader_; const DeleteFragmentVector delete_fragments_; }; diff --git a/cpp/include/milvus-storage/reader/record_reader.h b/cpp/include/milvus-storage/reader/record_reader.h index 81bc68df..7fafabf7 100644 --- a/cpp/include/milvus-storage/reader/record_reader.h +++ b/cpp/include/milvus-storage/reader/record_reader.h @@ -31,10 +31,10 @@ bool only_contain_vector_columns(std::shared_ptr schema, const std::set< bool filters_only_contain_pk_and_version(std::shared_ptr schema, const Filter::FilterSet& filters); -Result> MakeScanDataReader(std::shared_ptr manifest, +std::unique_ptr MakeScanDataReader(std::shared_ptr manifest, std::shared_ptr fs); -std::shared_ptr MakeScanDeleteReader(std::shared_ptr manifest, +std::unique_ptr MakeScanDeleteReader(std::shared_ptr manifest, std::shared_ptr fs); } // namespace internal } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/storage/manifest.h b/cpp/include/milvus-storage/storage/manifest.h index 194078b3..6b3e50a8 100644 --- a/cpp/include/milvus-storage/storage/manifest.h +++ b/cpp/include/milvus-storage/storage/manifest.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,7 +25,7 @@ class Manifest { Manifest() = default; explicit Manifest(std::shared_ptr schema); - const std::shared_ptr schema(); + std::shared_ptr schema(); void add_scalar_fragment(Fragment&& fragment); diff --git a/cpp/include/milvus-storage/storage/space.h b/cpp/include/milvus-storage/storage/space.h index 3072222a..807fc200 100644 --- a/cpp/include/milvus-storage/storage/space.h +++ b/cpp/include/milvus-storage/storage/space.h @@ -27,17 +27,17 @@ class RecordReader; class Space { public: - Status Write(arrow::RecordBatchReader* reader, const WriteOption& option); + Status Write(arrow::RecordBatchReader& reader, const WriteOption& option); std::unique_ptr Read(const ReadOptions& option) const; // Scan delete files - Result> ScanDelete() const; + std::unique_ptr ScanDelete() const; // Scan data files without filtering deleted data - Result> ScanData() const; + std::unique_ptr ScanData() const; - Status Delete(arrow::RecordBatchReader* reader); + Status Delete(arrow::RecordBatchReader& reader); // Open opened a space or create if the space does not exist. // If space does not exist. schema should not be nullptr, or an error will be returned. @@ -54,7 +54,7 @@ class Space { // Get the byte size of a blob. Result GetBlobByteSize(const std::string& name) const; - std::vector StatisticsBlobs() const; + const std::vector& StatisticsBlobs() const; std::shared_ptr schema() const; diff --git a/cpp/src/reader/common/combine_reader.cpp b/cpp/src/reader/common/combine_reader.cpp index f554a9ac..5785e950 100644 --- a/cpp/src/reader/common/combine_reader.cpp +++ b/cpp/src/reader/common/combine_reader.cpp @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -13,16 +13,14 @@ // limitations under the License. #include "reader/common/combine_reader.h" -#include "common/macro.h" +#include #include "arrow/type.h" namespace milvus_storage { -Result> CombineReader::Make(std::shared_ptr scalar_reader, - std::shared_ptr vector_reader, - std::shared_ptr schema) { - if (scalar_reader == nullptr || vector_reader == nullptr) { - return Status::InvalidArgument("null reader"); - } - return std::make_shared(scalar_reader, vector_reader, schema); +std::unique_ptr CombineReader::Make(std::unique_ptr scalar_reader, + std::unique_ptr vector_reader, + std::shared_ptr schema) { + assert(scalar_reader != nullptr && vector_reader != nullptr); + return std::make_unique(std::move(scalar_reader), std::move(vector_reader), schema); } std::shared_ptr CombineReader::schema() const { return schema_->schema(); } @@ -48,4 +46,5 @@ arrow::Status CombineReader::ReadNext(std::shared_ptr* batch *batch = arrow::RecordBatch::Make(schema(), scalar_batch->num_rows(), std::move(columns)); return arrow::Status::OK(); } -} // namespace milvus_storage \ No newline at end of file + +} // namespace milvus_storage diff --git a/cpp/src/reader/merge_record_reader.cpp b/cpp/src/reader/merge_record_reader.cpp index c3dc7eed..482066e5 100644 --- a/cpp/src/reader/merge_record_reader.cpp +++ b/cpp/src/reader/merge_record_reader.cpp @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -36,8 +36,8 @@ MergeRecordReader::MergeRecordReader(const ReadOptions& options, std::shared_ptr fs, std::shared_ptr schema) : schema_(schema), fs_(fs), options_(options), delete_fragments_(delete_fragments) { - scalar_reader_ = std::make_shared(fs, scalar_fragments, schema->scalar_schema(), options); - vector_reader_ = std::make_shared(fs, vector_fragments, schema->vector_schema(), options); + scalar_reader_ = std::make_unique(fs, scalar_fragments, schema->scalar_schema(), options); + vector_reader_ = std::make_unique(fs, vector_fragments, schema->vector_schema(), options); } std::shared_ptr MergeRecordReader::schema() const { @@ -73,8 +73,8 @@ arrow::Status MergeRecordReader::ReadNext(std::shared_ptr* b } Result> MergeRecordReader::MakeInnerReader() { - ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader_, vector_reader_, schema_)); - auto delete_reader = DeleteMergeReader::Make(combine_reader, schema_->options(), delete_fragments_, options_); + auto combine_reader = CombineReader::Make(std::move(scalar_reader_), std::move(vector_reader_), schema_); + auto delete_reader = DeleteMergeReader::Make(std::move(combine_reader), schema_->options(), delete_fragments_, options_); ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema(), delete_reader, options_)); return res; } diff --git a/cpp/src/reader/record_reader.cpp b/cpp/src/reader/record_reader.cpp index 914eceb6..abeaeca1 100644 --- a/cpp/src/reader/record_reader.cpp +++ b/cpp/src/reader/record_reader.cpp @@ -72,8 +72,7 @@ std::unique_ptr MakeRecordReader(std::shared_ptr(options, scalar_data, vector_data, delete_fragments, fs, schema); } -bool only_contain_scalar_columns(const std::shared_ptr schema, - const std::set& related_columns) { +bool only_contain_scalar_columns(const std::shared_ptr schema, const std::set& related_columns) { for (auto& column : related_columns) { if (schema->options()->vector_column == column) { return false; @@ -82,8 +81,7 @@ bool only_contain_scalar_columns(const std::shared_ptr schema, return true; } -bool only_contain_vector_columns(const std::shared_ptr schema, - const std::set& related_columns) { +bool only_contain_vector_columns(const std::shared_ptr schema, const std::set& related_columns) { for (auto& column : related_columns) { if (schema->options()->vector_column != column && schema->options()->primary_column != column && schema->options()->version_column != column) { @@ -93,8 +91,7 @@ bool only_contain_vector_columns(const std::shared_ptr schema, return true; } -bool filters_only_contain_pk_and_version(std::shared_ptr schema, - const Filter::FilterSet& filters) { +bool filters_only_contain_pk_and_version(std::shared_ptr schema, const Filter::FilterSet& filters) { for (auto& filter : filters) { if (filter->get_column_name() != schema->options()->primary_column && filter->get_column_name() != schema->options()->version_column) { @@ -104,22 +101,20 @@ bool filters_only_contain_pk_and_version(std::shared_ptr schema, return true; } -Result> MakeScanDataReader( - std::shared_ptr manifest, std::shared_ptr fs) { - auto scalar_reader = std::make_shared(fs, manifest->scalar_fragments(), +std::unique_ptr MakeScanDataReader(std::shared_ptr manifest, + std::shared_ptr fs) { + auto scalar_reader = std::make_unique(fs, manifest->scalar_fragments(), manifest->schema()->scalar_schema(), ReadOptions()); - auto vector_reader = std::make_shared(fs, manifest->vector_fragments(), + auto vector_reader = std::make_unique(fs, manifest->vector_fragments(), manifest->schema()->vector_schema(), ReadOptions()); - ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader, vector_reader, manifest->schema())); - return std::static_pointer_cast(combine_reader); + return CombineReader::Make(std::move(scalar_reader), std::move(vector_reader), manifest->schema()); } -std::shared_ptr MakeScanDeleteReader( - std::shared_ptr manifest, std::shared_ptr fs) { - auto reader = std::make_shared(fs, manifest->delete_fragments(), - manifest->schema()->delete_schema(), ReadOptions()); - return reader; +std::unique_ptr MakeScanDeleteReader(std::shared_ptr manifest, + std::shared_ptr fs) { + return std::make_unique(fs, manifest->delete_fragments(), + manifest->schema()->delete_schema(), ReadOptions()); } } // namespace internal } // namespace milvus_storage diff --git a/cpp/src/storage/manifest.cpp b/cpp/src/storage/manifest.cpp index e8fd5c83..9def53fc 100644 --- a/cpp/src/storage/manifest.cpp +++ b/cpp/src/storage/manifest.cpp @@ -21,7 +21,7 @@ namespace milvus_storage { Manifest::Manifest(std::shared_ptr schema) : schema_(std::move(schema)) {} -const std::shared_ptr Manifest::schema() { return schema_; } +std::shared_ptr Manifest::schema() { return schema_; } void Manifest::add_scalar_fragment(Fragment&& fragment) { scalar_fragments_.push_back(fragment); } diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index bffe3981..235213ac 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -29,7 +29,6 @@ #include "common/log.h" #include "common/macro.h" #include "file/delete_fragment.h" -#include "filter/constant_filter.h" #include "format/parquet/file_writer.h" #include "storage/space.h" #include "common/utils.h" @@ -47,8 +46,8 @@ Status Space::Init() { return Status::OK(); } -Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option) { - if (!reader->schema()->Equals(*this->manifest_->schema()->schema())) { +Status Space::Write(arrow::RecordBatchReader& reader, const WriteOption& option) { + if (!reader.schema()->Equals(*this->manifest_->schema()->schema())) { return Status::InvalidArgument("Schema not match"); } @@ -65,7 +64,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option) Fragment scalar_fragment; Fragment vector_fragment; - for (auto rec = reader->Next(); rec.ok(); rec = reader->Next()) { + for (auto rec = reader.Next(); rec.ok(); rec = reader.Next()) { auto batch = rec.ValueOrDie(); if (batch == nullptr) { break; @@ -140,12 +139,12 @@ Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option) return Status::OK(); } -Status Space::Delete(arrow::RecordBatchReader* reader) { +Status Space::Delete(arrow::RecordBatchReader& reader) { FileWriter* writer = nullptr; Fragment fragment; auto delete_fragment = std::make_shared(fs_, manifest_->schema()); std::string delete_file; - for (auto rec = reader->Next(); rec.ok(); rec = reader->Next()) { + for (auto rec = reader.Next(); rec.ok(); rec = reader.Next()) { auto batch = rec.ValueOrDie(); if (batch == nullptr) { break; @@ -307,13 +306,13 @@ Result Space::FindAllManifest(std::shared_ptr Space::StatisticsBlobs() const { return manifest_->blobs(); } +const std::vector& Space::StatisticsBlobs() const { return manifest_->blobs(); } -Result> Space::ScanDelete() const { +std::unique_ptr Space::ScanDelete() const { return internal::MakeScanDeleteReader(manifest_, fs_); } -Result> Space::ScanData() const { +std::unique_ptr Space::ScanData() const { return internal::MakeScanDataReader(manifest_, fs_); } diff --git a/cpp/test/space_test.cpp b/cpp/test/space_test.cpp index 4761e483..e65b1070 100644 --- a/cpp/test/space_test.cpp +++ b/cpp/test/space_test.cpp @@ -72,7 +72,7 @@ TEST(SpaceTest, SpaceWriteReadTest) { auto reader = arrow::RecordBatchReader::Make({rec_batch}, arrow_schema).ValueOrDie(); WriteOption write_option{10}; - space->Write(reader.get(), write_option); + space->Write(*reader, write_option); ConstantFilter filter(EQUAL, "pk_field", Value::Int64(1)); ReadOptions read_options;