From 75720c3a7a47cdc957cac1d907df4035a6fec3a2 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Tue, 2 Jan 2024 18:22:40 +0800 Subject: [PATCH] [Cpp]: redesign interfaces and rewrite (#104) Signed-off-by: sunby --- cpp/cmake/libopendal.cmake | 2 +- .../milvus-storage/common/arrow_util.h | 2 +- .../filter/conjunction_filter.h | 14 +++--- cpp/include/milvus-storage/filter/filter.h | 3 +- .../format/parquet/file_reader.h | 3 +- .../reader/common/delete_reader.h | 12 ++--- .../reader/common/filter_reader.h | 8 +-- .../reader/common/projection_reader.h | 6 +-- .../reader/filter_query_record_reader.h | 7 +-- .../reader/merge_record_reader.h | 5 +- .../reader/multi_files_sequential_reader.h | 5 +- .../milvus-storage/reader/record_reader.h | 37 +++++++------- .../reader/scan_record_reader.h | 4 +- cpp/include/milvus-storage/storage/manifest.h | 6 +-- cpp/include/milvus-storage/storage/options.h | 19 +++---- cpp/include/milvus-storage/storage/space.h | 30 ++++++------ cpp/src/common/arrow_util.cpp | 15 ++---- cpp/src/file/delete_fragment.cpp | 4 +- cpp/src/format/parquet/file_reader.cpp | 5 +- cpp/src/reader/common/delete_reader.cpp | 4 +- cpp/src/reader/common/filter_reader.cpp | 16 +++--- cpp/src/reader/common/projection_reader.cpp | 10 ++-- cpp/src/reader/filter_query_record_reader.cpp | 7 ++- cpp/src/reader/merge_record_reader.cpp | 5 +- .../reader/multi_files_sequential_reader.cpp | 4 +- cpp/src/reader/record_reader.cpp | 49 ++++++++++--------- cpp/src/reader/scan_record_reader.cpp | 4 +- cpp/src/storage/manifest.cpp | 6 +-- cpp/src/storage/space.cpp | 35 ++++++------- .../multi_files_sequential_reader_test.cpp | 12 ++--- cpp/test/space_test.cpp | 19 +++---- 31 files changed, 163 insertions(+), 195 deletions(-) diff --git a/cpp/cmake/libopendal.cmake b/cpp/cmake/libopendal.cmake index a21ab173..021952bf 100644 --- a/cpp/cmake/libopendal.cmake +++ b/cpp/cmake/libopendal.cmake @@ -25,7 +25,7 @@ function(build_opendal) CONFIGURE_COMMAND echo "configure for opendal_ep" BUILD_COMMAND cargo build ${OPENDAL_BUILD_OPTS} BUILD_IN_SOURCE 1 - INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/") + INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/") add_library(opendal STATIC IMPORTED) diff --git a/cpp/include/milvus-storage/common/arrow_util.h b/cpp/include/milvus-storage/common/arrow_util.h index 2fd041e6..5337b95e 100644 --- a/cpp/include/milvus-storage/common/arrow_util.h +++ b/cpp/include/milvus-storage/common/arrow_util.h @@ -25,5 +25,5 @@ Result> MakeArrowFileReader(std::sha Result> MakeArrowRecordBatchReader( std::shared_ptr reader, - std::shared_ptr options = ReadOptions::default_read_options()); + const ReadOptions& options = ReadOptions()); } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/filter/conjunction_filter.h b/cpp/include/milvus-storage/filter/conjunction_filter.h index ae504f1f..7f900782 100644 --- a/cpp/include/milvus-storage/filter/conjunction_filter.h +++ b/cpp/include/milvus-storage/filter/conjunction_filter.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,7 +21,7 @@ namespace milvus_storage { class ConjunctionOrFilter : public Filter { public: - explicit ConjunctionOrFilter(std::vector>& filters, std::string column_name) + explicit ConjunctionOrFilter(const FilterSet& filters, std::string column_name) : Filter(std::move(column_name)), filters_(filters) {} bool CheckStatistics(parquet::Statistics* stats) override; @@ -29,12 +29,12 @@ class ConjunctionOrFilter : public Filter { Status Apply(arrow::Array* col_data, filter_mask& bitset) override; private: - std::vector>& filters_; + const FilterSet& filters_; }; class ConjunctionAndFilter : public Filter { public: - explicit ConjunctionAndFilter(std::vector>& filters, std::string column_name) + explicit ConjunctionAndFilter(const FilterSet& filters, std::string column_name) : Filter(std::move(column_name)), filters_(filters) {} bool CheckStatistics(parquet::Statistics* stats) override; @@ -42,6 +42,6 @@ class ConjunctionAndFilter : public Filter { Status Apply(arrow::Array* col_data, filter_mask& bitset) override; private: - std::vector>& filters_; + const FilterSet& filters_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/filter/filter.h b/cpp/include/milvus-storage/filter/filter.h index 1214c4e5..7f170051 100644 --- a/cpp/include/milvus-storage/filter/filter.h +++ b/cpp/include/milvus-storage/filter/filter.h @@ -27,6 +27,7 @@ namespace milvus_storage { using filter_mask = std::bitset; class Filter { public: + using FilterSet = std::vector; explicit Filter(std::string column_name) : column_name_(std::move(column_name)) {} virtual bool CheckStatistics(parquet::Statistics*) = 0; @@ -36,7 +37,7 @@ class Filter { virtual Status Apply(arrow::Array* col_data, filter_mask& bitset) = 0; static Status ApplyFilter(const std::shared_ptr& record_batch, - std::vector>& filters, + const FilterSet& filters, filter_mask& bitset) { for (auto& filter : filters) { auto col_data = record_batch->GetColumnByName(filter->get_column_name()); diff --git a/cpp/include/milvus-storage/format/parquet/file_reader.h b/cpp/include/milvus-storage/format/parquet/file_reader.h index ddb2ac5b..e1bf3eeb 100644 --- a/cpp/include/milvus-storage/format/parquet/file_reader.h +++ b/cpp/include/milvus-storage/format/parquet/file_reader.h @@ -21,7 +21,7 @@ namespace milvus_storage { class ParquetFileReader : public Reader { public: - ParquetFileReader(std::shared_ptr reader, std::shared_ptr& options); + ParquetFileReader(std::shared_ptr reader); void Close() override {} @@ -29,6 +29,5 @@ class ParquetFileReader : public Reader { private: std::shared_ptr reader_; - std::shared_ptr options_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/reader/common/delete_reader.h b/cpp/include/milvus-storage/reader/common/delete_reader.h index a0f190f3..b5c2c621 100644 --- a/cpp/include/milvus-storage/reader/common/delete_reader.h +++ b/cpp/include/milvus-storage/reader/common/delete_reader.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,7 +33,7 @@ class DeleteMergeReader : public arrow::RecordBatchReader { static std::shared_ptr Make(std::shared_ptr reader, std::shared_ptr schema_options, const DeleteFragmentVector& delete_fragments, - std::shared_ptr options); + const ReadOptions& options); std::shared_ptr schema() const override; arrow::Status ReadNext(std::shared_ptr* batch) override; @@ -41,7 +41,7 @@ class DeleteMergeReader : public arrow::RecordBatchReader { DeleteMergeReader(std::shared_ptr reader, DeleteFragmentVector delete_fragments, std::shared_ptr schema_options, - std::shared_ptr options) + const ReadOptions& options) : reader_(std::move(reader)), delete_fragments_(std::move(delete_fragments)), schema_options_(std::move(schema_options)), @@ -52,7 +52,7 @@ class DeleteMergeReader : public arrow::RecordBatchReader { std::shared_ptr filtered_batch_reader_; DeleteFragmentVector delete_fragments_; std::shared_ptr schema_options_; - std::shared_ptr options_; + const ReadOptions options_; }; // RecordBatchWithDeltedOffsets is reader helper to fetch records not deleted without copy diff --git a/cpp/include/milvus-storage/reader/common/filter_reader.h b/cpp/include/milvus-storage/reader/common/filter_reader.h index 2790254d..54f9706e 100644 --- a/cpp/include/milvus-storage/reader/common/filter_reader.h +++ b/cpp/include/milvus-storage/reader/common/filter_reader.h @@ -31,16 +31,16 @@ class FilterReader : public arrow::RecordBatchReader { arrow::Status ReadNext(std::shared_ptr* batch) override; static Result> Make(std::shared_ptr reader, - std::shared_ptr option); + const ReadOptions& option); - FilterReader(std::shared_ptr reader, std::shared_ptr option) - : record_reader_(std::move(reader)), option_(std::move(option)) {} + FilterReader(std::shared_ptr reader, const ReadOptions& option) + : record_reader_(std::move(reader)), option_(option) {} private: arrow::Status NextFilteredBatchReader(); std::shared_ptr record_reader_; - std::shared_ptr option_; + const ReadOptions& option_; std::shared_ptr current_filtered_batch_reader_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/reader/common/projection_reader.h b/cpp/include/milvus-storage/reader/common/projection_reader.h index 31c633a6..2782849b 100644 --- a/cpp/include/milvus-storage/reader/common/projection_reader.h +++ b/cpp/include/milvus-storage/reader/common/projection_reader.h @@ -28,15 +28,15 @@ class ProjectionReader : public arrow::RecordBatchReader { static Result> Make(std::shared_ptr schema, std ::shared_ptr reader, - std::shared_ptr options); + const ReadOptions& options); ProjectionReader(std::shared_ptr schema, std ::shared_ptr reader, - std::shared_ptr options); + const ReadOptions& options); private: std::shared_ptr reader_; - std::shared_ptr options_; + const ReadOptions options_; std::shared_ptr schema_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/reader/filter_query_record_reader.h b/cpp/include/milvus-storage/reader/filter_query_record_reader.h index 61a88632..7142b98f 100644 --- a/cpp/include/milvus-storage/reader/filter_query_record_reader.h +++ b/cpp/include/milvus-storage/reader/filter_query_record_reader.h @@ -18,14 +18,11 @@ #include #include "file/delete_fragment.h" #include "file/fragment.h" -#include "format/parquet/file_reader.h" -#include "reader/multi_files_sequential_reader.h" -#include "storage/space.h" namespace milvus_storage { class FilterQueryRecordReader : public arrow::RecordBatchReader { public: - FilterQueryRecordReader(std::shared_ptr options, + FilterQueryRecordReader(const ReadOptions& options, const FragmentVector& scalar_fragments, const FragmentVector& vector_fragments, const DeleteFragmentVector& delete_fragments, @@ -42,7 +39,7 @@ class FilterQueryRecordReader : public arrow::RecordBatchReader { std::shared_ptr fs_; std::shared_ptr schema_; - std::shared_ptr options_; + const ReadOptions options_; DeleteFragmentVector delete_fragments_; std::vector scalar_files_; diff --git a/cpp/include/milvus-storage/reader/merge_record_reader.h b/cpp/include/milvus-storage/reader/merge_record_reader.h index 2a51e554..045e5fcd 100644 --- a/cpp/include/milvus-storage/reader/merge_record_reader.h +++ b/cpp/include/milvus-storage/reader/merge_record_reader.h @@ -18,7 +18,6 @@ #include "file/delete_fragment.h" #include "file/fragment.h" #include "storage/options.h" -#include "storage/space.h" namespace milvus_storage { // MergeRecordReader is to scan files to get records and merge them together. @@ -27,7 +26,7 @@ namespace milvus_storage { // \ FileReader(scalar) class MergeRecordReader : public arrow::RecordBatchReader { public: - explicit MergeRecordReader(std::shared_ptr options, + explicit MergeRecordReader(const ReadOptions& options, const FragmentVector& scalar_fragments, const FragmentVector& vector_fragments, const DeleteFragmentVector& delete_fragments, @@ -43,7 +42,7 @@ class MergeRecordReader : public arrow::RecordBatchReader { std::shared_ptr fs_; std::shared_ptr schema_; - std::shared_ptr options_; + const ReadOptions options_; std::shared_ptr scalar_reader_; std::shared_ptr vector_reader_; diff --git a/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h b/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h index eac03c57..1e4c1dcb 100644 --- a/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h +++ b/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h @@ -19,7 +19,6 @@ #include #include "file/fragment.h" #include "storage/space.h" -#include "reader/multi_files_sequential_reader.h" namespace milvus_storage { @@ -28,7 +27,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader { MultiFilesSequentialReader(std::shared_ptr fs, const FragmentVector& fragments, std::shared_ptr schema, - std::shared_ptr options); + const ReadOptions& options); std::shared_ptr schema() const override; @@ -43,7 +42,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader { std::shared_ptr curr_reader_; std::shared_ptr holding_file_reader_; // file reader have to outlive than record batch reader, so we hold here. - std::shared_ptr options_; + const ReadOptions options_; friend FilterQueryRecordReader; }; diff --git a/cpp/include/milvus-storage/reader/record_reader.h b/cpp/include/milvus-storage/reader/record_reader.h index 0002bcea..81bc68df 100644 --- a/cpp/include/milvus-storage/reader/record_reader.h +++ b/cpp/include/milvus-storage/reader/record_reader.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,27 +15,26 @@ #pragma once #include "file/delete_fragment.h" -#include "reader/filter_query_record_reader.h" +#include "storage/manifest.h" namespace milvus_storage { -struct RecordReader { - static std::unique_ptr MakeRecordReader(std::shared_ptr manifest, - std::shared_ptr schema, - std::shared_ptr fs, - DeleteFragmentVector delete_fragments, - std::shared_ptr& options); +namespace internal { +std::unique_ptr MakeRecordReader(std::shared_ptr manifest, + std::shared_ptr schema, + std::shared_ptr fs, + DeleteFragmentVector delete_fragments, + const ReadOptions& options); - static bool only_contain_scalar_columns(std::shared_ptr schema, const std::set& related_columns); +bool only_contain_scalar_columns(std::shared_ptr schema, const std::set& related_columns); - static bool only_contain_vector_columns(std::shared_ptr schema, const std::set& related_columns); +bool only_contain_vector_columns(std::shared_ptr schema, const std::set& related_columns); - static bool filters_only_contain_pk_and_version(std::shared_ptr schema, - const std::vector>& filters); +bool filters_only_contain_pk_and_version(std::shared_ptr schema, const Filter::FilterSet& filters); - static Result> MakeScanDataReader( - std::shared_ptr manifest, std::shared_ptr fs); +Result> MakeScanDataReader(std::shared_ptr manifest, + std::shared_ptr fs); - static std::shared_ptr MakeScanDeleteReader(std::shared_ptr manifest, - std::shared_ptr fs); -}; +std::shared_ptr MakeScanDeleteReader(std::shared_ptr manifest, + std::shared_ptr fs); +} // namespace internal } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/reader/scan_record_reader.h b/cpp/include/milvus-storage/reader/scan_record_reader.h index 4a9eb557..763b98fb 100644 --- a/cpp/include/milvus-storage/reader/scan_record_reader.h +++ b/cpp/include/milvus-storage/reader/scan_record_reader.h @@ -22,7 +22,7 @@ namespace milvus_storage { class ScanRecordReader : public arrow::RecordBatchReader { public: ScanRecordReader(std::shared_ptr schema, - std::shared_ptr options, + const ReadOptions& options, std::shared_ptr fs, const FragmentVector& fragments, const DeleteFragmentVector& delete_fragments); @@ -35,7 +35,7 @@ class ScanRecordReader : public arrow::RecordBatchReader { Result> MakeInnerReader(); std::shared_ptr schema_; - std::shared_ptr options_; + const ReadOptions options_; std::shared_ptr fs_; const FragmentVector fragments_; const DeleteFragmentVector delete_fragments_; diff --git a/cpp/include/milvus-storage/storage/manifest.h b/cpp/include/milvus-storage/storage/manifest.h index 7c3b9278..194078b3 100644 --- a/cpp/include/milvus-storage/storage/manifest.h +++ b/cpp/include/milvus-storage/storage/manifest.h @@ -41,11 +41,11 @@ class Manifest { [[nodiscard]] const FragmentVector& delete_fragments() const; - bool has_blob(std::string& name); + bool has_blob(const std::string& name); - void remove_blob_if_exist(std::string& name); + void remove_blob_if_exist(const std::string& name); - Result get_blob(std::string& name); + Result get_blob(const std::string& name); [[nodiscard]] const std::vector& blobs() const; diff --git a/cpp/include/milvus-storage/storage/options.h b/cpp/include/milvus-storage/storage/options.h index debc2e45..7f4c6993 100644 --- a/cpp/include/milvus-storage/storage/options.h +++ b/cpp/include/milvus-storage/storage/options.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,6 +14,7 @@ #pragma once +#include #include #include "filter/filter.h" #include "proto/manifest.pb.h" @@ -30,20 +31,12 @@ struct WriteOption { int64_t max_record_per_file = 1024; }; -using FilterSet = std::vector>; struct ReadOptions { - FilterSet filters; + Filter::FilterSet filters; + std::vector columns; // must have pk and version // int limit = -1; int64_t version = INT64_MAX; - - static std::shared_ptr default_read_options() { - static std::shared_ptr options = std::make_shared(); - return options; - } - - std::vector output_columns() { return columns; } - bool has_version() { return version != -1; } }; struct SchemaOptions { diff --git a/cpp/include/milvus-storage/storage/space.h b/cpp/include/milvus-storage/storage/space.h index ec776ee5..3072222a 100644 --- a/cpp/include/milvus-storage/storage/space.h +++ b/cpp/include/milvus-storage/storage/space.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,12 +15,10 @@ #pragma once #include #include -#include #include #include "storage/manifest.h" #include "storage/schema.h" -#include "storage/space.h" #include "file/delete_fragment.h" namespace milvus_storage { @@ -29,15 +27,15 @@ class RecordReader; class Space { public: - Status Write(arrow::RecordBatchReader* reader, WriteOption* option); + Status Write(arrow::RecordBatchReader* reader, const WriteOption& option); - std::unique_ptr Read(std::shared_ptr option); + std::unique_ptr Read(const ReadOptions& option) const; // Scan delete files - Result> ScanDelete(); + Result> ScanDelete() const; // Scan data files without filtering deleted data - Result> ScanData(); + Result> ScanData() const; Status Delete(arrow::RecordBatchReader* reader); @@ -45,22 +43,22 @@ class Space { // If space does not exist. schema should not be nullptr, or an error will be returned. // If space exists and version is specified, it will restore to the state at this version, // or it will choose the latest version. - static Result> Open(const std::string& uri, Options options); + static Result> Open(const std::string& uri, const Options& options); // Write a blob to space. Will return a error if replace is false and a blob with the same name exists. - Status WriteBlob(std::string name, void* blob, int64_t length, bool replace = false); + Status WriteBlob(const std::string& name, const void* blob, int64_t length, bool replace = false); // Read a blob from space, the target must have enough size to hold this blob. - Status ReadBlob(std::string name, void* target); + Status ReadBlob(const std::string& name, void* target) const; // Get the byte size of a blob. - Result GetBlobByteSize(std::string name); + Result GetBlobByteSize(const std::string& name) const; - std::vector StatisticsBlobs(); + std::vector StatisticsBlobs() const; - std::shared_ptr schema(); + std::shared_ptr schema() const; - int64_t GetCurrentVersion(); + int64_t GetCurrentVersion() const; private: Status Init(); diff --git a/cpp/src/common/arrow_util.cpp b/cpp/src/common/arrow_util.cpp index 3c504ea4..623ef0e6 100644 --- a/cpp/src/common/arrow_util.cpp +++ b/cpp/src/common/arrow_util.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include "common/arrow_util.h" -#include "common/log.h" #include "common/macro.h" namespace milvus_storage { @@ -27,22 +26,16 @@ Result> MakeArrowFileReader(std::sha } Result> MakeArrowRecordBatchReader( - std::shared_ptr reader, std::shared_ptr options) { + std::shared_ptr reader, const ReadOptions& options) { auto metadata = reader->parquet_reader()->metadata(); std::vector row_group_indices; std::vector column_indices; - // if (options->output_columns().size() == 0) { - // for (auto i = 0; i < metadata->schema()->num_columns(); i++) { - // auto column = metadata->schema()->Column(i); - // column_indices.emplace_back(column->logical_type) - // } - // } - for (const auto& column_name : options->columns) { + for (const auto& column_name : options.columns) { auto column_idx = metadata->schema()->ColumnIndex(column_name); column_indices.emplace_back(column_idx); } - for (const auto& filter : options->filters) { + for (const auto& filter : options.filters) { auto column_idx = metadata->schema()->ColumnIndex(filter->get_column_name()); column_indices.emplace_back(column_idx); } @@ -51,7 +44,7 @@ Result> MakeArrowRecordBatchReader( auto row_group_metadata = metadata->RowGroup(i); bool can_ignored = false; - for (const auto& filter : options->filters) { + for (const auto& filter : options.filters) { auto column_idx = metadata->schema()->ColumnIndex(filter->get_column_name()); auto column_meta = row_group_metadata->ColumnChunk(column_idx); auto stats = column_meta->statistics(); diff --git a/cpp/src/file/delete_fragment.cpp b/cpp/src/file/delete_fragment.cpp index aac002bc..9d88a338 100644 --- a/cpp/src/file/delete_fragment.cpp +++ b/cpp/src/file/delete_fragment.cpp @@ -48,8 +48,8 @@ Result DeleteFragment::Make(std::shared_ptr(); - opts->columns = schema->delete_schema()->field_names(); + ReadOptions opts; + opts.columns = schema->delete_schema()->field_names(); MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema(), opts); for (const auto& batch_rec : rec_reader) { ASSIGN_OR_RETURN_ARROW_NOT_OK(auto batch, batch_rec); diff --git a/cpp/src/format/parquet/file_reader.cpp b/cpp/src/format/parquet/file_reader.cpp index 2b34bf85..5ca12e7b 100644 --- a/cpp/src/format/parquet/file_reader.cpp +++ b/cpp/src/format/parquet/file_reader.cpp @@ -27,9 +27,8 @@ namespace milvus_storage { -ParquetFileReader::ParquetFileReader(std::shared_ptr reader, - std::shared_ptr& options) - : reader_(std::move(reader)), options_(options) {} +ParquetFileReader::ParquetFileReader(std::shared_ptr reader) + : reader_(std::move(reader)) {} Result> GetRecordAtOffset(arrow::RecordBatchReader* reader, int64_t offset) { int64_t skipped = 0; diff --git a/cpp/src/reader/common/delete_reader.cpp b/cpp/src/reader/common/delete_reader.cpp index 1dc62410..8d4e7b94 100644 --- a/cpp/src/reader/common/delete_reader.cpp +++ b/cpp/src/reader/common/delete_reader.cpp @@ -18,7 +18,7 @@ namespace milvus_storage { std::shared_ptr DeleteMergeReader::Make(std::shared_ptr reader, std::shared_ptr schema_options, const DeleteFragmentVector& delete_fragments, - std::shared_ptr options) { + const ReadOptions& options) { // DeleteFragmentVector filtered_delete_fragments; // for (auto& delete_fragment : delete_fragments) { // if (schema_options->has_version_column() || delete_fragment.id() > fragment_id) { @@ -56,7 +56,7 @@ arrow::Status DeleteMergeReader::ReadNext(std::shared_ptr* b return arrow::Status::Invalid("Version column not found"); } auto visitor = DeleteFilterVisitor(delete_fragments_, std::static_pointer_cast(version_col), - options_->version); + options_.version); auto pk_col = record_batch->GetColumnByName(schema_options_->primary_column); if (pk_col == nullptr) { diff --git a/cpp/src/reader/common/filter_reader.cpp b/cpp/src/reader/common/filter_reader.cpp index 2bae3034..42712599 100644 --- a/cpp/src/reader/common/filter_reader.cpp +++ b/cpp/src/reader/common/filter_reader.cpp @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,15 +15,12 @@ #include "reader/common/filter_reader.h" #include #include "arrow/record_batch.h" -#include "arrow/table.h" -#include "common/log.h" #include -#include namespace milvus_storage { Result> FilterReader::Make(std::shared_ptr reader, - std::shared_ptr option) { + const ReadOptions& option) { return std::make_shared(reader, option); } @@ -58,8 +55,7 @@ arrow::Status FilterReader::ReadNext(std::shared_ptr* batch) } } -arrow::RecordBatchVector ApplyFilter(std::shared_ptr& batch, - std::vector>& filters) { +arrow::RecordBatchVector ApplyFilter(std::shared_ptr& batch, const Filter::FilterSet& filters) { filter_mask bitset; Filter::ApplyFilter(batch, filters, bitset); if (bitset.none()) { @@ -101,7 +97,7 @@ arrow::Status FilterReader::NextFilteredBatchReader() { break; } - filtered_batches = ApplyFilter(rec_batch, option_->filters); + filtered_batches = ApplyFilter(rec_batch, option_.filters); } while (filtered_batches.empty()); if (filtered_batches.empty()) { diff --git a/cpp/src/reader/common/projection_reader.cpp b/cpp/src/reader/common/projection_reader.cpp index e96e60e6..b217bd24 100644 --- a/cpp/src/reader/common/projection_reader.cpp +++ b/cpp/src/reader/common/projection_reader.cpp @@ -24,14 +24,14 @@ namespace milvus_storage { ProjectionReader::ProjectionReader(std::shared_ptr schema, std ::shared_ptr reader, - std::shared_ptr options) - : reader_(std::move(reader)), options_(std::move(options)), schema_(schema) {} + const ReadOptions& options) + : reader_(std::move(reader)), options_(options), schema_(schema) {} Result> ProjectionReader::Make( std::shared_ptr schema, std ::shared_ptr reader, - std::shared_ptr options) { - ASSIGN_OR_RETURN_NOT_OK(auto projection_schema, ProjectSchema(schema, options->columns)); + const ReadOptions& options) { + ASSIGN_OR_RETURN_NOT_OK(auto projection_schema, ProjectSchema(schema, options.columns)); std::shared_ptr projection_reader = std::make_shared(projection_schema, reader, options); return projection_reader; @@ -57,7 +57,7 @@ arrow::Status ProjectionReader::ReadNext(std::shared_ptr* ba std::vector> projection_cols; for (int i = 0; i < tmp->num_columns(); ++i) { auto col_name = tmp->column_name(i); - if (std::find(options_->columns.begin(), options_->columns.end(), col_name) != options_->columns.end()) { + if (std::find(options_.columns.begin(), options_.columns.end(), col_name) != options_.columns.end()) { projection_cols.push_back(tmp->column(i)); } } diff --git a/cpp/src/reader/filter_query_record_reader.cpp b/cpp/src/reader/filter_query_record_reader.cpp index ccbff606..182ae806 100644 --- a/cpp/src/reader/filter_query_record_reader.cpp +++ b/cpp/src/reader/filter_query_record_reader.cpp @@ -25,11 +25,10 @@ #include "reader/common/delete_reader.h" #include "reader/common/filter_reader.h" #include "reader/common/projection_reader.h" -#include "reader/multi_files_sequential_reader.h" #include "common/utils.h" namespace milvus_storage { -FilterQueryRecordReader::FilterQueryRecordReader(std::shared_ptr options, +FilterQueryRecordReader::FilterQueryRecordReader(const ReadOptions& options, const FragmentVector& scalar_fragments, const FragmentVector& vector_fragments, const DeleteFragmentVector& delete_fragments, @@ -48,7 +47,7 @@ FilterQueryRecordReader::FilterQueryRecordReader(std::shared_ptr op assert(scalar_files_.size() == vector_files_.size()); } std::shared_ptr FilterQueryRecordReader::schema() const { - auto r = ProjectSchema(schema_->schema(), options_->output_columns()); + auto r = ProjectSchema(schema_->schema(), options_.columns); if (!r.ok()) { return nullptr; } @@ -97,7 +96,7 @@ Result> FilterQueryRecordReader::MakeI ASSIGN_OR_RETURN_NOT_OK(holding_scalar_file_reader_, MakeArrowFileReader(fs_, scalar_file)); ASSIGN_OR_RETURN_NOT_OK(holding_vector_file_reader_, MakeArrowFileReader(fs_, vector_file)); ASSIGN_OR_RETURN_NOT_OK(auto scalar_rec_reader, MakeArrowRecordBatchReader(holding_scalar_file_reader_, options_)); - auto current_vector_reader = std::make_shared(holding_vector_file_reader_, options_); + auto current_vector_reader = std::make_shared(holding_vector_file_reader_); ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineOffsetReader::Make(scalar_rec_reader, current_vector_reader, schema_)); diff --git a/cpp/src/reader/merge_record_reader.cpp b/cpp/src/reader/merge_record_reader.cpp index 24aa9868..c3dc7eed 100644 --- a/cpp/src/reader/merge_record_reader.cpp +++ b/cpp/src/reader/merge_record_reader.cpp @@ -18,7 +18,6 @@ #include #include #include -#include "common/arrow_util.h" #include "common/macro.h" #include "common/status.h" #include "reader/common/combine_reader.h" @@ -30,7 +29,7 @@ namespace milvus_storage { -MergeRecordReader::MergeRecordReader(std::shared_ptr options, +MergeRecordReader::MergeRecordReader(const ReadOptions& options, const FragmentVector& scalar_fragments, const FragmentVector& vector_fragments, const DeleteFragmentVector& delete_fragments, @@ -42,7 +41,7 @@ MergeRecordReader::MergeRecordReader(std::shared_ptr options, } std::shared_ptr MergeRecordReader::schema() const { - auto r = ProjectSchema(schema_->schema(), options_->output_columns()); + auto r = ProjectSchema(schema_->schema(), options_.columns); return r.ok() ? r.value() : nullptr; } diff --git a/cpp/src/reader/multi_files_sequential_reader.cpp b/cpp/src/reader/multi_files_sequential_reader.cpp index b397ab9d..69733535 100644 --- a/cpp/src/reader/multi_files_sequential_reader.cpp +++ b/cpp/src/reader/multi_files_sequential_reader.cpp @@ -15,16 +15,14 @@ #include "reader/multi_files_sequential_reader.h" #include #include -#include #include "common/arrow_util.h" -#include "common/macro.h" namespace milvus_storage { MultiFilesSequentialReader::MultiFilesSequentialReader(std::shared_ptr fs, const FragmentVector& fragments, std::shared_ptr schema, - std::shared_ptr options) + const ReadOptions& options) : fs_(fs), schema_(std::move(schema)), options_(options) { for (const auto& fragment : fragments) { files_.insert(files_.end(), fragment.files().begin(), fragment.files().end()); diff --git a/cpp/src/reader/record_reader.cpp b/cpp/src/reader/record_reader.cpp index 5ab39ea9..914eceb6 100644 --- a/cpp/src/reader/record_reader.cpp +++ b/cpp/src/reader/record_reader.cpp @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -43,17 +43,19 @@ DeleteFragmentVector FilterDeleteFragments(FragmentVector& data_fragments, Delet return res; } -std::unique_ptr RecordReader::MakeRecordReader(std::shared_ptr manifest, - std::shared_ptr schema, - std::shared_ptr fs, - DeleteFragmentVector delete_fragments, - std::shared_ptr& options) { +namespace internal { + +std::unique_ptr MakeRecordReader(std::shared_ptr manifest, + std::shared_ptr schema, + std::shared_ptr fs, + DeleteFragmentVector delete_fragments, + const ReadOptions& options) { // TODO: Implement a common optimization method. For now we just enumerate few plans. std::set related_columns; - for (auto& column : options->columns) { + for (auto& column : options.columns) { related_columns.insert(column); } - for (auto& filter : options->filters) { + for (auto& filter : options.filters) { related_columns.insert(filter->get_column_name()); } @@ -64,13 +66,13 @@ std::unique_ptr RecordReader::MakeRecordReader(std::sh return std::make_unique(schema, options, fs, data_fragments, delete_fragments); } - if (filters_only_contain_pk_and_version(schema, options->filters)) { + if (filters_only_contain_pk_and_version(schema, options.filters)) { return std::make_unique(options, scalar_data, vector_data, delete_fragments, fs, schema); } return std::make_unique(options, scalar_data, vector_data, delete_fragments, fs, schema); } -bool RecordReader::only_contain_scalar_columns(const std::shared_ptr schema, +bool only_contain_scalar_columns(const std::shared_ptr schema, const std::set& related_columns) { for (auto& column : related_columns) { if (schema->options()->vector_column == column) { @@ -80,7 +82,7 @@ bool RecordReader::only_contain_scalar_columns(const std::shared_ptr sch return true; } -bool RecordReader::only_contain_vector_columns(const std::shared_ptr schema, +bool only_contain_vector_columns(const std::shared_ptr schema, const std::set& related_columns) { for (auto& column : related_columns) { if (schema->options()->vector_column != column && schema->options()->primary_column != column && @@ -91,8 +93,8 @@ bool RecordReader::only_contain_vector_columns(const std::shared_ptr sch return true; } -bool RecordReader::filters_only_contain_pk_and_version(std::shared_ptr schema, - const std::vector>& filters) { +bool filters_only_contain_pk_and_version(std::shared_ptr schema, + const Filter::FilterSet& filters) { for (auto& filter : filters) { if (filter->get_column_name() != schema->options()->primary_column && filter->get_column_name() != schema->options()->version_column) { @@ -102,21 +104,22 @@ bool RecordReader::filters_only_contain_pk_and_version(std::shared_ptr s return true; } -Result> RecordReader::MakeScanDataReader( +Result> MakeScanDataReader( std::shared_ptr manifest, std::shared_ptr fs) { - auto scalar_reader = std::make_shared( - fs, manifest->scalar_fragments(), manifest->schema()->scalar_schema(), ReadOptions::default_read_options()); - auto vector_reader = std::make_shared( - fs, manifest->vector_fragments(), manifest->schema()->vector_schema(), ReadOptions::default_read_options()); + auto scalar_reader = std::make_shared(fs, manifest->scalar_fragments(), + manifest->schema()->scalar_schema(), ReadOptions()); + auto vector_reader = std::make_shared(fs, manifest->vector_fragments(), + manifest->schema()->vector_schema(), ReadOptions()); ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader, vector_reader, manifest->schema())); return std::static_pointer_cast(combine_reader); } -std::shared_ptr RecordReader::MakeScanDeleteReader( +std::shared_ptr MakeScanDeleteReader( std::shared_ptr manifest, std::shared_ptr fs) { - auto reader = std::make_shared( - fs, manifest->delete_fragments(), manifest->schema()->delete_schema(), ReadOptions::default_read_options()); + auto reader = std::make_shared(fs, manifest->delete_fragments(), + manifest->schema()->delete_schema(), ReadOptions()); return reader; } +} // namespace internal } // namespace milvus_storage diff --git a/cpp/src/reader/scan_record_reader.cpp b/cpp/src/reader/scan_record_reader.cpp index af109e08..31987389 100644 --- a/cpp/src/reader/scan_record_reader.cpp +++ b/cpp/src/reader/scan_record_reader.cpp @@ -24,14 +24,14 @@ namespace milvus_storage { ScanRecordReader::ScanRecordReader(std::shared_ptr schema, - std::shared_ptr options, + const ReadOptions& options, std::shared_ptr fs, const FragmentVector& fragments, const DeleteFragmentVector& delete_fragments) : schema_(schema), options_(options), fs_(fs), fragments_(fragments), delete_fragments_(delete_fragments) {} std::shared_ptr ScanRecordReader::schema() const { - auto r = ProjectSchema(schema_->schema(), options_->output_columns()); + auto r = ProjectSchema(schema_->schema(), options_.columns); if (!r.ok()) { return nullptr; } diff --git a/cpp/src/storage/manifest.cpp b/cpp/src/storage/manifest.cpp index 9407d6ad..e8fd5c83 100644 --- a/cpp/src/storage/manifest.cpp +++ b/cpp/src/storage/manifest.cpp @@ -37,16 +37,16 @@ const FragmentVector& Manifest::vector_fragments() const { return vector_fragmen const FragmentVector& Manifest::delete_fragments() const { return delete_fragments_; } -bool Manifest::has_blob(std::string& name) { +bool Manifest::has_blob(const std::string& name) { auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; }); return iter != blobs_.end(); } -void Manifest::remove_blob_if_exist(std::string& name) { +void Manifest::remove_blob_if_exist(const std::string& name) { std::remove_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; }); } -Result Manifest::get_blob(std::string& name) { +Result Manifest::get_blob(const std::string& name) { auto iter = std::find_if(blobs_.begin(), blobs_.end(), [&](Blob& blob) { return blob.name == name; }); if (iter == blobs_.end()) { return Status::FileNotFound("blob not found"); diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index ff558f73..bffe3981 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -32,7 +32,6 @@ #include "filter/constant_filter.h" #include "format/parquet/file_writer.h" #include "storage/space.h" -#include "arrow/util/uri.h" #include "common/utils.h" #include "storage/manifest.h" #include "reader/record_reader.h" @@ -48,7 +47,7 @@ Status Space::Init() { return Status::OK(); } -Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) { +Status Space::Write(arrow::RecordBatchReader* reader, const WriteOption& option) { if (!reader->schema()->Equals(*this->manifest_->schema()->schema())) { return Status::InvalidArgument("Schema not match"); } @@ -112,7 +111,7 @@ Status Space::Write(arrow::RecordBatchReader* reader, WriteOption* option) { RETURN_NOT_OK(scalar_writer->Write(scalar_record.get())); RETURN_NOT_OK(vector_writer->Write(vector_record.get())); - if (scalar_writer->count() >= option->max_record_per_file) { + if (scalar_writer->count() >= option.max_record_per_file) { scalar_writer->Close(); vector_writer->Close(); scalar_writer.reset(); @@ -181,16 +180,12 @@ Status Space::Delete(arrow::RecordBatchReader* reader) { return Status::OK(); } -std::unique_ptr Space::Read(std::shared_ptr option) { - if (manifest_->schema()->options()->has_version_column()) { - option->filters.push_back(std::make_unique( - ComparisonType::LESS_EQUAL, manifest_->schema()->options()->version_column, option->version)); - } +std::unique_ptr Space::Read(const ReadOptions& option) const { // TODO: remove second argument - return RecordReader::MakeRecordReader(manifest_, manifest_->schema(), fs_, delete_fragments_, option); + return internal::MakeRecordReader(manifest_, manifest_->schema(), fs_, delete_fragments_, option); } -Status Space::WriteBlob(std::string name, void* blob, int64_t length, bool replace) { +Status Space::WriteBlob(const std::string& name, const void* blob, int64_t length, bool replace) { if (!replace && manifest_->has_blob(name)) { return Status::InvalidArgument("blob already exist"); } @@ -211,7 +206,7 @@ Status Space::WriteBlob(std::string name, void* blob, int64_t length, bool repla return Status::OK(); } -Status Space::ReadBlob(std::string name, void* target) { +Status Space::ReadBlob(const std::string& name, void* target) const { auto manifest = manifest_; ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name)); ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs_->OpenInputFile(blob.file)); @@ -219,7 +214,7 @@ Status Space::ReadBlob(std::string name, void* target) { return Status::OK(); } -Result Space::GetBlobByteSize(std::string name) { +Result Space::GetBlobByteSize(const std::string& name) const { auto manifest = manifest_; ASSIGN_OR_RETURN_NOT_OK(auto blob, manifest->get_blob(name)); return blob.size; @@ -240,7 +235,7 @@ Status Space::SafeSaveManifest(std::shared_ptr fs, return Status::OK(); } -Result> Space::Open(const std::string& uri, Options options) { +Result> Space::Open(const std::string& uri, const Options& options) { std::shared_ptr fs; std::shared_ptr manifest; std::string path; @@ -312,18 +307,18 @@ Result Space::FindAllManifest(std::shared_ptr Space::StatisticsBlobs() { return manifest_->blobs(); } +std::vector Space::StatisticsBlobs() const { return manifest_->blobs(); } -Result> Space::ScanDelete() { - return RecordReader::MakeScanDeleteReader(manifest_, fs_); +Result> Space::ScanDelete() const { + return internal::MakeScanDeleteReader(manifest_, fs_); } -Result> Space::ScanData() { - return RecordReader::MakeScanDataReader(manifest_, fs_); +Result> Space::ScanData() const { + return internal::MakeScanDataReader(manifest_, fs_); } -std::shared_ptr Space::schema() { return manifest_->schema(); } +std::shared_ptr Space::schema() const { return manifest_->schema(); } -int64_t Space::GetCurrentVersion() { return manifest_->version(); } +int64_t Space::GetCurrentVersion() const { return manifest_->version(); } } // namespace milvus_storage diff --git a/cpp/test/multi_files_sequential_reader_test.cpp b/cpp/test/multi_files_sequential_reader_test.cpp index fab8f371..13b64609 100644 --- a/cpp/test/multi_files_sequential_reader_test.cpp +++ b/cpp/test/multi_files_sequential_reader_test.cpp @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -55,8 +55,8 @@ TEST(MultiFilesSeqReaderTest, ReadTest) { Fragment frag(1); frag.add_file("/tmp/file1"); frag.add_file("/tmp/file2"); - auto opt = std::make_shared(); - opt->columns.emplace_back("pk_field"); + ReadOptions opt; + opt.columns.emplace_back("pk_field"); MultiFilesSequentialReader r(fs, {frag}, arrow_schema, opt); ASSERT_AND_ARROW_ASSIGN(auto table, r.ToTable()); ASSERT_AND_ARROW_ASSIGN(auto combined_table, table->CombineChunks()); @@ -70,4 +70,4 @@ TEST(MultiFilesSeqReaderTest, ReadTest) { ASSERT_STATUS_OK(r.Close()); } -} // namespace milvus_storage \ No newline at end of file +} // namespace milvus_storage diff --git a/cpp/test/space_test.cpp b/cpp/test/space_test.cpp index 24b183c0..4761e483 100644 --- a/cpp/test/space_test.cpp +++ b/cpp/test/space_test.cpp @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,6 +24,7 @@ #include #include #include +#include "storage/options.h" #include "test_util.h" #include "arrow/table.h" @@ -70,13 +71,13 @@ TEST(SpaceTest, SpaceWriteReadTest) { auto rec_batch = arrow::RecordBatch::Make(arrow_schema, 3, {pk_array, ts_array, vec_array}); auto reader = arrow::RecordBatchReader::Make({rec_batch}, arrow_schema).ValueOrDie(); - auto write_option = WriteOption{10}; - space->Write(reader.get(), &write_option); + WriteOption write_option{10}; + space->Write(reader.get(), write_option); - std::unique_ptr filter = std::make_unique(EQUAL, "pk_field", Value::Int64(1)); - auto read_options = std::make_shared(); - read_options->filters.push_back(std::move(filter)); - read_options->columns.emplace_back("pk_field"); + ConstantFilter filter(EQUAL, "pk_field", Value::Int64(1)); + ReadOptions read_options; + read_options.filters.push_back(&filter); + read_options.columns.emplace_back("pk_field"); auto res_reader = space->Read(read_options); ASSERT_AND_ARROW_ASSIGN(auto table, res_reader->ToTable()); auto pk_chunk_arr = table->GetColumnByName("pk_field");