diff --git a/cpp/.clang-tidy b/cpp/.clang-tidy index 5488f26..725b03e 100644 --- a/cpp/.clang-tidy +++ b/cpp/.clang-tidy @@ -26,7 +26,8 @@ Checks: > performance-trivially-destructible, performance-inefficient-vector-operation, performance-move-const-arg, performance-move-constructor-init, performance-noexcept-move-constructor, performance-no-automatic-move, - performance-type-promotion-in-math-fn + performance-type-promotion-in-math-fn, performance-unnecessary-copy-initialization, + performance-unnecessary-value-param # produce HeaderFilterRegex from core/build-support/lint_exclusions.txt with: # echo -n '^?!('; sed -e 's/*/\.*/g' core/build-support/lint_exclusions.txt | tr '\n' '|'; echo ')$' diff --git a/cpp/Makefile b/cpp/Makefile index 963d390..aeb255c 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -21,10 +21,12 @@ test: build fix-format: find ./src -type f ! -name "*.pb.h" -iname *.h -o -iname *.cpp | xargs clang-format -i + find ./include -type f ! -name "*.pb.h" -iname *.h -o -iname *.cpp | xargs clang-format -i find ./test -type f ! -name "*.pb.h" -iname *.h -o -iname *.cpp | xargs clang-format -i check-format: find ./src -type f ! -name "*.pb.h" -iname *.h -o -iname *.cpp | xargs clang-format --dry-run --Werror + find ./include -type f ! -name "*.pb.h" -iname *.h -o -iname *.cpp | xargs clang-format --dry-run --Werror find ./test -type f ! -name "*.pb.h" -iname *.h -o -iname *.cpp | xargs clang-format --dry-run --Werror check-tidy: diff --git a/cpp/include/milvus-storage/common/arrow_util.h b/cpp/include/milvus-storage/common/arrow_util.h index 81fccab..2e508d3 100644 --- a/cpp/include/milvus-storage/common/arrow_util.h +++ b/cpp/include/milvus-storage/common/arrow_util.h @@ -23,6 +23,8 @@ namespace milvus_storage { Result> MakeArrowFileReader(arrow::fs::FileSystem& fs, const std::string& file_path); -Result> MakeArrowRecordBatchReader( - parquet::arrow::FileReader& reader, const ReadOptions& options = ReadOptions()); +Result> MakeArrowRecordBatchReader(parquet::arrow::FileReader& reader, + std::shared_ptr schema, + const SchemaOptions& schema_options, + const ReadOptions& options = {}); } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/common/constants.h b/cpp/include/milvus-storage/common/constants.h index fa2dec8..440927e 100644 --- a/cpp/include/milvus-storage/common/constants.h +++ b/cpp/include/milvus-storage/common/constants.h @@ -1,17 +1,18 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#pragma once #include namespace milvus_storage { diff --git a/cpp/include/milvus-storage/common/fs_util.h b/cpp/include/milvus-storage/common/fs_util.h index 648edd5..67be428 100644 --- a/cpp/include/milvus-storage/common/fs_util.h +++ b/cpp/include/milvus-storage/common/fs_util.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/common/log.h b/cpp/include/milvus-storage/common/log.h index 8fc0a9f..3f51e85 100644 --- a/cpp/include/milvus-storage/common/log.h +++ b/cpp/include/milvus-storage/common/log.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -57,16 +57,8 @@ ///////////////////////////////////////////////////////////////////////////////////////////////// #define STORAGE_MODULE_NAME "STORAGE" #define STORAGE_MODULE_CLASS_FUNCTION \ - LogOut("[%s][%s::%s][%s] ", \ - STORAGE_MODULE_NAME, \ - (typeid(*this).name()), \ - __FUNCTION__, \ - GetThreadName().c_str()) -#define STORAGE_MODULE_FUNCTION \ - LogOut("[%s][%s][%s] ", \ - STORAGE_MODULE_NAME, \ - __FUNCTION__, \ - GetThreadName().c_str()) + LogOut("[%s][%s::%s][%s] ", STORAGE_MODULE_NAME, (typeid(*this).name()), __FUNCTION__, GetThreadName().c_str()) +#define STORAGE_MODULE_FUNCTION LogOut("[%s][%s][%s] ", STORAGE_MODULE_NAME, __FUNCTION__, GetThreadName().c_str()) #define LOG_STORAGE_TRACE_ DLOG(INFO) << STORAGE_MODULE_FUNCTION #define LOG_STORAGE_DEBUG_ DLOG(INFO) << STORAGE_MODULE_FUNCTION @@ -77,16 +69,12 @@ ///////////////////////////////////////////////////////////////////////////////////////////////// -std::string -LogOut(const char* pattern, ...); +std::string LogOut(const char* pattern, ...); -void -SetThreadName(const std::string_view name); +void SetThreadName(const std::string_view name); -std::string -GetThreadName(); +std::string GetThreadName(); -int64_t -get_thread_start_timestamp(); +int64_t get_thread_start_timestamp(); // } // namespace milvus diff --git a/cpp/include/milvus-storage/common/macro.h b/cpp/include/milvus-storage/common/macro.h index 74b91f7..afb1488 100644 --- a/cpp/include/milvus-storage/common/macro.h +++ b/cpp/include/milvus-storage/common/macro.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/common/opendal_fs.h b/cpp/include/milvus-storage/common/opendal_fs.h index 176b3d3..85a4262 100644 --- a/cpp/include/milvus-storage/common/opendal_fs.h +++ b/cpp/include/milvus-storage/common/opendal_fs.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/common/result.h b/cpp/include/milvus-storage/common/result.h index a24bf3a..7f8cd8b 100644 --- a/cpp/include/milvus-storage/common/result.h +++ b/cpp/include/milvus-storage/common/result.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/common/status.h b/cpp/include/milvus-storage/common/status.h index 1ef74a5..9609683 100644 --- a/cpp/include/milvus-storage/common/status.h +++ b/cpp/include/milvus-storage/common/status.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/common/utils.h b/cpp/include/milvus-storage/common/utils.h index 627a6cc..13a929c 100644 --- a/cpp/include/milvus-storage/common/utils.h +++ b/cpp/include/milvus-storage/common/utils.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,7 @@ #include #include "proto/schema_arrow.pb.h" #include "result.h" +#include "storage/options.h" namespace milvus_storage { @@ -32,11 +33,14 @@ std::string GetManifestFilePath(const std::string& path, int64_t version); std::string GetManifestTmpFilePath(const std::string& path, int64_t version); -Result> ProjectSchema(std::shared_ptr schema, - std::vector columns); +Result> ProjectSchema(std::shared_ptr schema, const ReadOptions& options); int64_t ParseVersionFromFileName(const std::string& path); +ReadOptions CreateInternalReadOptions(std::shared_ptr schema, + const SchemaOptions& schema_options, + const ReadOptions& options); + std::string GetManifestDir(const std::string& path); std::string GetScalarDataDir(const std::string& path); std::string GetVectorDataDir(const std::string& path); diff --git a/cpp/include/milvus-storage/file/blob.h b/cpp/include/milvus-storage/file/blob.h index 8b269ff..4c73b27 100644 --- a/cpp/include/milvus-storage/file/blob.h +++ b/cpp/include/milvus-storage/file/blob.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/file/file.h b/cpp/include/milvus-storage/file/file.h index bd6c3dd..10314b8 100644 --- a/cpp/include/milvus-storage/file/file.h +++ b/cpp/include/milvus-storage/file/file.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/file/fragment.h b/cpp/include/milvus-storage/file/fragment.h index 528bb36..0d71a7d 100644 --- a/cpp/include/milvus-storage/file/fragment.h +++ b/cpp/include/milvus-storage/file/fragment.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/filter/constant_filter.h b/cpp/include/milvus-storage/filter/constant_filter.h index a2e494d..64f2695 100644 --- a/cpp/include/milvus-storage/filter/constant_filter.h +++ b/cpp/include/milvus-storage/filter/constant_filter.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/filter/filter.h b/cpp/include/milvus-storage/filter/filter.h index 7f17005..7f9b184 100644 --- a/cpp/include/milvus-storage/filter/filter.h +++ b/cpp/include/milvus-storage/filter/filter.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/filter/value.h b/cpp/include/milvus-storage/filter/value.h index 4d76685..154adfa 100644 --- a/cpp/include/milvus-storage/filter/value.h +++ b/cpp/include/milvus-storage/filter/value.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/format/parquet/file_reader.h b/cpp/include/milvus-storage/format/parquet/file_reader.h index 02ec30c..9fdf8fc 100644 --- a/cpp/include/milvus-storage/format/parquet/file_reader.h +++ b/cpp/include/milvus-storage/format/parquet/file_reader.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/format/reader.h b/cpp/include/milvus-storage/format/reader.h index de70f49..981f153 100644 --- a/cpp/include/milvus-storage/format/reader.h +++ b/cpp/include/milvus-storage/format/reader.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/format/writer.h b/cpp/include/milvus-storage/format/writer.h index bf2fc6b..ff869e5 100644 --- a/cpp/include/milvus-storage/format/writer.h +++ b/cpp/include/milvus-storage/format/writer.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/reader/common/filter_reader.h b/cpp/include/milvus-storage/reader/common/filter_reader.h index 4f16956..29023b6 100644 --- a/cpp/include/milvus-storage/reader/common/filter_reader.h +++ b/cpp/include/milvus-storage/reader/common/filter_reader.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -31,7 +31,7 @@ class FilterReader : public arrow::RecordBatchReader { arrow::Status ReadNext(std::shared_ptr* batch) override; static std::unique_ptr Make(std::unique_ptr reader, - const ReadOptions& option); + const ReadOptions& option); FilterReader(std::unique_ptr reader, const ReadOptions& option) : record_reader_(std::move(reader)), option_(option) {} diff --git a/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h b/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h index bf24538..b6094b6 100644 --- a/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h +++ b/cpp/include/milvus-storage/reader/multi_files_sequential_reader.h @@ -27,6 +27,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader { MultiFilesSequentialReader(arrow::fs::FileSystem& fs, const FragmentVector& fragments, std::shared_ptr schema, + const SchemaOptions& schema_options, const ReadOptions& options); std::shared_ptr schema() const override; @@ -43,6 +44,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader { std::unique_ptr holding_file_reader_; // file reader have to outlive than record batch reader, so we hold here. const ReadOptions options_; + const SchemaOptions schema_options_; friend FilterQueryRecordReader; }; diff --git a/cpp/include/milvus-storage/reader/record_reader.h b/cpp/include/milvus-storage/reader/record_reader.h index 5d5ad60..73810bb 100644 --- a/cpp/include/milvus-storage/reader/record_reader.h +++ b/cpp/include/milvus-storage/reader/record_reader.h @@ -32,7 +32,8 @@ bool only_contain_vector_columns(std::shared_ptr schema, const std::set< bool filters_only_contain_pk_and_version(std::shared_ptr schema, const Filter::FilterSet& filters); std::unique_ptr MakeScanDataReader(std::shared_ptr manifest, - arrow::fs::FileSystem& fs); + arrow::fs::FileSystem& fs, + const ReadOptions& options = {}); std::unique_ptr MakeScanDeleteReader(std::shared_ptr manifest, arrow::fs::FileSystem& fs); diff --git a/cpp/include/milvus-storage/reader/scan_record_reader.h b/cpp/include/milvus-storage/reader/scan_record_reader.h index 80e1ac0..165c568 100644 --- a/cpp/include/milvus-storage/reader/scan_record_reader.h +++ b/cpp/include/milvus-storage/reader/scan_record_reader.h @@ -21,7 +21,8 @@ namespace milvus_storage { class ScanRecordReader : public arrow::RecordBatchReader { public: - ScanRecordReader(std::shared_ptr schema, + ScanRecordReader(std::shared_ptr schema, + const SchemaOptions& schema_options, const ReadOptions& options, arrow::fs::FileSystem& fs, const FragmentVector& fragments, @@ -34,7 +35,8 @@ class ScanRecordReader : public arrow::RecordBatchReader { private: Result> MakeInnerReader(); - std::shared_ptr schema_; + std::shared_ptr schema_; + const SchemaOptions schema_options_; const ReadOptions options_; arrow::fs::FileSystem& fs_; const FragmentVector fragments_; diff --git a/cpp/include/milvus-storage/storage/options.h b/cpp/include/milvus-storage/storage/options.h index 0f694c0..e971eab 100644 --- a/cpp/include/milvus-storage/storage/options.h +++ b/cpp/include/milvus-storage/storage/options.h @@ -34,9 +34,11 @@ struct WriteOption { struct ReadOptions { Filter::FilterSet filters; - std::vector columns; // must have pk and version + std::set columns; // empty means all columns // int limit = -1; int64_t version = INT64_MAX; + + static bool ReturnAllColumns(const ReadOptions& options) { return options.columns.empty(); } }; struct SchemaOptions { diff --git a/cpp/include/milvus-storage/storage/schema.h b/cpp/include/milvus-storage/storage/schema.h index 3d9f93f..423358c 100644 --- a/cpp/include/milvus-storage/storage/schema.h +++ b/cpp/include/milvus-storage/storage/schema.h @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/cpp/include/milvus-storage/storage/space.h b/cpp/include/milvus-storage/storage/space.h index 224c6a1..15f4480 100644 --- a/cpp/include/milvus-storage/storage/space.h +++ b/cpp/include/milvus-storage/storage/space.h @@ -27,7 +27,6 @@ class RecordReader; class Space { public: - Status Write(arrow::RecordBatchReader& reader, const WriteOption& option); std::unique_ptr Read(const ReadOptions& option) const; @@ -36,7 +35,7 @@ class Space { std::unique_ptr ScanDelete() const; // Scan data files without filtering deleted data - std::unique_ptr ScanData() const; + std::unique_ptr ScanData(const std::set& columns = {}) const; Status Delete(arrow::RecordBatchReader& reader); diff --git a/cpp/src/common/arrow_util.cpp b/cpp/src/common/arrow_util.cpp index 547bc0d..2f3f806 100644 --- a/cpp/src/common/arrow_util.cpp +++ b/cpp/src/common/arrow_util.cpp @@ -14,6 +14,7 @@ #include "common/arrow_util.h" #include "common/macro.h" +#include "common/utils.h" namespace milvus_storage { Result> MakeArrowFileReader(arrow::fs::FileSystem& fs, @@ -26,25 +27,28 @@ Result> MakeArrowFileReader(arrow::f } Result> MakeArrowRecordBatchReader(parquet::arrow::FileReader& reader, + std::shared_ptr schema, + const SchemaOptions& schema_options, const ReadOptions& options) { + auto internal_options = CreateInternalReadOptions(schema, schema_options, options); auto metadata = reader.parquet_reader()->metadata(); std::vector row_group_indices; - std::vector column_indices; + std::set column_indices; - for (const auto& column_name : options.columns) { + for (const auto& column_name : internal_options.columns) { auto column_idx = metadata->schema()->ColumnIndex(column_name); - column_indices.emplace_back(column_idx); + column_indices.insert(column_idx); } - for (const auto& filter : options.filters) { + for (const auto& filter : internal_options.filters) { auto column_idx = metadata->schema()->ColumnIndex(filter->get_column_name()); - column_indices.emplace_back(column_idx); + column_indices.insert(column_idx); } for (int i = 0; i < metadata->num_row_groups(); ++i) { auto row_group_metadata = metadata->RowGroup(i); bool can_ignored = false; - for (const auto& filter : options.filters) { + for (const auto& filter : internal_options.filters) { auto column_idx = metadata->schema()->ColumnIndex(filter->get_column_name()); auto column_meta = row_group_metadata->ColumnChunk(column_idx); auto stats = column_meta->statistics(); @@ -63,8 +67,8 @@ Result> MakeArrowRecordBatchReader(par } std::unique_ptr record_reader; - // RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, column_indices, &record_reader)); - RETURN_ARROW_NOT_OK(reader.GetRecordBatchReader(row_group_indices, &record_reader)); + RETURN_ARROW_NOT_OK( + reader.GetRecordBatchReader(row_group_indices, {column_indices.begin(), column_indices.end()}, &record_reader)); return record_reader; } diff --git a/cpp/src/common/utils.cpp b/cpp/src/common/utils.cpp index a4352c5..80c6ecb 100644 --- a/cpp/src/common/utils.cpp +++ b/cpp/src/common/utils.cpp @@ -15,6 +15,7 @@ #include "common/utils.h" #include #include +#include #include #include #include @@ -25,9 +26,28 @@ #include "common/macro.h" #include "arrow/filesystem/path_util.h" #include "boost/algorithm/string/predicate.hpp" +#include "storage/options.h" #include +#include "storage/schema.h" namespace milvus_storage { +ReadOptions CreateInternalReadOptions(std::shared_ptr schema, + const SchemaOptions& schema_options, + const ReadOptions& options) { + ReadOptions internal_option = options; + if (ReadOptions::ReturnAllColumns(options)) { + const auto& field_names = schema->field_names(); + internal_option.columns = + std::set(std::make_move_iterator(field_names.begin()), std::make_move_iterator(field_names.end())); + } else { + internal_option.columns.insert(schema_options.primary_column); + if (schema_options.has_version_column()) { + internal_option.columns.insert(schema_options.version_column); + } + } + return internal_option; +} + Result ToProtobufType(arrow::Type::type type) { auto type_id = static_cast(type); if (type_id < 0 || type_id >= static_cast(schema_proto::LogicType::MAX_ID)) { @@ -284,10 +304,13 @@ int64_t ParseVersionFromFileName(const std::string& path) { } Result> ProjectSchema(std::shared_ptr schema, - std::vector columns) { + const ReadOptions& options) { + if (ReadOptions::ReturnAllColumns(options)) { + return schema; + } std::vector> fields; for (auto const& field : schema->fields()) { - if (std::find(columns.begin(), columns.end(), field->name()) != columns.end()) { + if (options.columns.find(field->name()) != options.columns.end()) { fields.push_back(field); } } diff --git a/cpp/src/file/delete_fragment.cpp b/cpp/src/file/delete_fragment.cpp index abdc5a7..9de9c8d 100644 --- a/cpp/src/file/delete_fragment.cpp +++ b/cpp/src/file/delete_fragment.cpp @@ -48,9 +48,7 @@ Result DeleteFragment::Make(arrow::fs::FileSystem& fs, const Fragment& fragment) { DeleteFragment delete_fragment(fs, schema, fragment.id()); - ReadOptions opts; - opts.columns = schema->delete_schema()->field_names(); - MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema(), opts); + MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema(), schema->options(), {}); for (const auto& batch_rec : rec_reader) { ASSIGN_OR_RETURN_ARROW_NOT_OK(auto batch, batch_rec); delete_fragment.Add(batch); diff --git a/cpp/src/reader/common/projection_reader.cpp b/cpp/src/reader/common/projection_reader.cpp index 1b437cf..f6af726 100644 --- a/cpp/src/reader/common/projection_reader.cpp +++ b/cpp/src/reader/common/projection_reader.cpp @@ -31,7 +31,7 @@ Result> ProjectionReader::Make( std::shared_ptr schema, std ::unique_ptr reader, const ReadOptions& options) { - ASSIGN_OR_RETURN_NOT_OK(auto projection_schema, ProjectSchema(schema, options.columns)); + ASSIGN_OR_RETURN_NOT_OK(auto projection_schema, ProjectSchema(schema, options)); std::unique_ptr projection_reader = std::make_unique(projection_schema, std::move(reader), options); return projection_reader; @@ -55,9 +55,11 @@ arrow::Status ProjectionReader::ReadNext(std::shared_ptr* ba } std::vector> projection_cols; + const auto& schema_field_names = schema_->field_names(); for (int i = 0; i < tmp->num_columns(); ++i) { auto col_name = tmp->column_name(i); - if (std::find(options_.columns.begin(), options_.columns.end(), col_name) != options_.columns.end()) { + + if (std::find(schema_field_names.begin(), schema_field_names.end(), col_name) != schema_field_names.end()) { projection_cols.push_back(tmp->column(i)); } } diff --git a/cpp/src/reader/filter_query_record_reader.cpp b/cpp/src/reader/filter_query_record_reader.cpp index bc65598..1d121aa 100644 --- a/cpp/src/reader/filter_query_record_reader.cpp +++ b/cpp/src/reader/filter_query_record_reader.cpp @@ -47,7 +47,7 @@ FilterQueryRecordReader::FilterQueryRecordReader(const ReadOptions& options, assert(scalar_files_.size() == vector_files_.size()); } std::shared_ptr FilterQueryRecordReader::schema() const { - auto r = ProjectSchema(schema_->schema(), options_.columns); + auto r = ProjectSchema(schema_->schema(), options_); if (!r.ok()) { return nullptr; } @@ -94,7 +94,9 @@ Result> FilterQueryRecordReader::MakeI auto scalar_file = scalar_files_[next_pos_], vector_file = vector_files_[next_pos_]; ASSIGN_OR_RETURN_NOT_OK(holding_scalar_file_reader_, MakeArrowFileReader(fs_, scalar_file)); ASSIGN_OR_RETURN_NOT_OK(holding_vector_file_reader_, MakeArrowFileReader(fs_, vector_file)); - ASSIGN_OR_RETURN_NOT_OK(auto scalar_rec_reader, MakeArrowRecordBatchReader(*holding_scalar_file_reader_, options_)); + ASSIGN_OR_RETURN_NOT_OK( + auto scalar_rec_reader, + MakeArrowRecordBatchReader(*holding_scalar_file_reader_, schema_->scalar_schema(), schema_->options(), options_)); auto current_vector_reader = std::make_unique(std::move(holding_vector_file_reader_)); auto combine_reader = diff --git a/cpp/src/reader/merge_record_reader.cpp b/cpp/src/reader/merge_record_reader.cpp index 7f6e491..ca577f6 100644 --- a/cpp/src/reader/merge_record_reader.cpp +++ b/cpp/src/reader/merge_record_reader.cpp @@ -36,12 +36,14 @@ MergeRecordReader::MergeRecordReader(const ReadOptions& options, arrow::fs::FileSystem& fs, std::shared_ptr schema) : schema_(schema), fs_(fs), options_(options), delete_fragments_(delete_fragments) { - scalar_reader_ = std::make_unique(fs, scalar_fragments, schema->scalar_schema(), options); - vector_reader_ = std::make_unique(fs, vector_fragments, schema->vector_schema(), options); + scalar_reader_ = std::make_unique(fs, scalar_fragments, schema->scalar_schema(), + schema->options(), options); + vector_reader_ = std::make_unique(fs, vector_fragments, schema->vector_schema(), + schema->options(), options); } std::shared_ptr MergeRecordReader::schema() const { - auto r = ProjectSchema(schema_->schema(), options_.columns); + auto r = ProjectSchema(schema_->schema(), options_); return r.ok() ? r.value() : nullptr; } diff --git a/cpp/src/reader/multi_files_sequential_reader.cpp b/cpp/src/reader/multi_files_sequential_reader.cpp index 7542bc4..a12c39c 100644 --- a/cpp/src/reader/multi_files_sequential_reader.cpp +++ b/cpp/src/reader/multi_files_sequential_reader.cpp @@ -22,8 +22,9 @@ namespace milvus_storage { MultiFilesSequentialReader::MultiFilesSequentialReader(arrow::fs::FileSystem& fs, const FragmentVector& fragments, std::shared_ptr schema, + const SchemaOptions& schema_options, const ReadOptions& options) - : fs_(fs), schema_(std::move(schema)), options_(options) { + : fs_(fs), schema_(std::move(schema)), schema_options_(schema_options), options_(options) { for (const auto& fragment : fragments) { files_.insert(files_.end(), fragment.files().begin(), fragment.files().end()); } @@ -45,7 +46,7 @@ arrow::Status MultiFilesSequentialReader::ReadNext(std::shared_ptr MakeRecordReader(std::shared_ptr(schema, options, fs, data_fragments, delete_fragments); + auto scan_schema = only_scalar ? schema->scalar_schema() : schema->vector_schema(); + return std::make_unique(scan_schema, schema->options(), options, fs, data_fragments, + delete_fragments); } if (filters_only_contain_pk_and_version(schema, options.filters)) { @@ -101,11 +103,14 @@ bool filters_only_contain_pk_and_version(std::shared_ptr schema, const F } std::unique_ptr MakeScanDataReader(std::shared_ptr manifest, - arrow::fs::FileSystem& fs) { + arrow::fs::FileSystem& fs, + const ReadOptions& options) { auto scalar_reader = std::make_unique(fs, manifest->scalar_fragments(), - manifest->schema()->scalar_schema(), ReadOptions()); + manifest->schema()->scalar_schema(), + manifest->schema()->options(), ReadOptions()); auto vector_reader = std::make_unique(fs, manifest->vector_fragments(), - manifest->schema()->vector_schema(), ReadOptions()); + manifest->schema()->vector_schema(), + manifest->schema()->options(), ReadOptions()); return CombineReader::Make(std::move(scalar_reader), std::move(vector_reader), manifest->schema()); } @@ -113,7 +118,8 @@ std::unique_ptr MakeScanDataReader(std::shared_ptr MakeScanDeleteReader(std::shared_ptr manifest, arrow::fs::FileSystem& fs) { return std::make_unique(fs, manifest->delete_fragments(), - manifest->schema()->delete_schema(), ReadOptions()); + manifest->schema()->delete_schema(), + manifest->schema()->options(), ReadOptions()); } } // namespace internal } // namespace milvus_storage diff --git a/cpp/src/reader/scan_record_reader.cpp b/cpp/src/reader/scan_record_reader.cpp index a235dab..93a6405 100644 --- a/cpp/src/reader/scan_record_reader.cpp +++ b/cpp/src/reader/scan_record_reader.cpp @@ -23,20 +23,27 @@ namespace milvus_storage { -ScanRecordReader::ScanRecordReader(std::shared_ptr schema, +ScanRecordReader::ScanRecordReader(std::shared_ptr schema, + const SchemaOptions& schema_options, const ReadOptions& options, arrow::fs::FileSystem& fs, const FragmentVector& fragments, const DeleteFragmentVector& delete_fragments) - : schema_(schema), options_(options), fs_(fs), fragments_(fragments), delete_fragments_(delete_fragments) {} + : schema_(schema), + schema_options_(schema_options), + options_(options), + fs_(fs), + fragments_(fragments), + delete_fragments_(delete_fragments) {} std::shared_ptr ScanRecordReader::schema() const { - auto r = ProjectSchema(schema_->schema(), options_.columns); + auto r = ProjectSchema(schema_, options_); if (!r.ok()) { return nullptr; } return r.value(); } + arrow::Status ScanRecordReader::ReadNext(std::shared_ptr* batch) { if (reader_ == nullptr) { auto res = MakeInnerReader(); @@ -50,11 +57,10 @@ arrow::Status ScanRecordReader::ReadNext(std::shared_ptr* ba } Result> ScanRecordReader::MakeInnerReader() { - auto reader = std::make_unique(fs_, fragments_, schema_->schema(), options_); + auto reader = std::make_unique(fs_, fragments_, schema_, schema_options_, options_); auto filter_reader = FilterReader::Make(std::move(reader), options_); - auto delete_reader = - DeleteMergeReader::Make(std::move(filter_reader), schema_->options(), delete_fragments_, options_); - ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema_->schema(), std::move(delete_reader), options_)); + auto delete_reader = DeleteMergeReader::Make(std::move(filter_reader), schema_options_, delete_fragments_, options_); + ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema_, std::move(delete_reader), options_)); return res; } } // namespace milvus_storage diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index 10ee896..f354093 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -309,8 +309,8 @@ std::unique_ptr Space::ScanDelete() const { return internal::MakeScanDeleteReader(manifest_, *fs_); } -std::unique_ptr Space::ScanData() const { - return internal::MakeScanDataReader(manifest_, *fs_); +std::unique_ptr Space::ScanData(const std::set& columns) const { + return internal::MakeScanDataReader(manifest_, *fs_, ReadOptions{.columns = columns}); } std::shared_ptr Space::schema() const { return manifest_->schema(); } diff --git a/cpp/test/common/arrow_utils_test.cpp b/cpp/test/common/arrow_utils_test.cpp index 9798c83..7b23933 100644 --- a/cpp/test/common/arrow_utils_test.cpp +++ b/cpp/test/common/arrow_utils_test.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "common/arrow_util.h" #include "common/fs_util.h" @@ -35,9 +36,10 @@ TEST_F(ArrowUtilsTest, TestMakeArrowRecordBatchReader) { std::string out; ASSERT_AND_ASSIGN(auto fs, BuildFileSystem("file://" + path_.string(), &out)); auto file_path = path_.string() + "/test.parquet"; - ASSERT_STATUS_OK(PrepareSimpleParquetFile(*fs, file_path, 1)); + auto schema = CreateArrowSchema({"f_int64"}, {arrow::int64()}); + ASSERT_STATUS_OK(PrepareSimpleParquetFile(schema, *fs, file_path, 1)); ASSERT_AND_ASSIGN(auto file_reader, MakeArrowFileReader(*fs, file_path)); - ASSERT_AND_ASSIGN(auto batch_reader, MakeArrowRecordBatchReader(*file_reader)); + ASSERT_AND_ASSIGN(auto batch_reader, MakeArrowRecordBatchReader(*file_reader, schema, {.primary_column = "f_int64"})); ASSERT_AND_ARROW_ASSIGN(auto batch, batch_reader->Next()); ASSERT_EQ(1, batch->num_rows()); } diff --git a/cpp/test/include/test_util.h b/cpp/test/include/test_util.h index 3609d43..7b99d87 100644 --- a/cpp/test/include/test_util.h +++ b/cpp/test/include/test_util.h @@ -49,5 +49,8 @@ namespace milvus_storage { std::shared_ptr CreateArrowSchema(std::vector field_names, std::vector> field_types); -Status PrepareSimpleParquetFile(arrow::fs::FileSystem& fs, const std::string& file_path, int num_rows); +Status PrepareSimpleParquetFile(std::shared_ptr schema, + arrow::fs::FileSystem& fs, + const std::string& file_path, + int num_rows); } // namespace milvus_storage diff --git a/cpp/test/multi_files_sequential_reader_test.cpp b/cpp/test/multi_files_sequential_reader_test.cpp index cdcb657..1a22b87 100644 --- a/cpp/test/multi_files_sequential_reader_test.cpp +++ b/cpp/test/multi_files_sequential_reader_test.cpp @@ -55,9 +55,9 @@ TEST(MultiFilesSeqReaderTest, ReadTest) { Fragment frag(1); frag.add_file("/tmp/file1"); frag.add_file("/tmp/file2"); - ReadOptions opt; - opt.columns.emplace_back("pk_field"); - MultiFilesSequentialReader r(*fs, {frag}, arrow_schema, opt); + ReadOptions opt{.columns = {"pk_field"}}; + SchemaOptions schema_options{.primary_column = "pk_field"}; + MultiFilesSequentialReader r(*fs, {frag}, arrow_schema, schema_options, opt); ASSERT_AND_ARROW_ASSIGN(auto table, r.ToTable()); ASSERT_AND_ARROW_ASSIGN(auto combined_table, table->CombineChunks()); auto pk_res = std::dynamic_pointer_cast(combined_table->GetColumnByName("pk_field")->chunk(0)); diff --git a/cpp/test/space_test.cpp b/cpp/test/space_test.cpp index 2ccacb5..aa32993 100644 --- a/cpp/test/space_test.cpp +++ b/cpp/test/space_test.cpp @@ -77,7 +77,7 @@ TEST(SpaceTest, SpaceWriteReadTest) { ConstantFilter filter(EQUAL, "pk_field", Value::Int64(1)); ReadOptions read_options; read_options.filters.push_back(&filter); - read_options.columns.emplace_back("pk_field"); + read_options.columns.insert("pk_field"); auto res_reader = space->Read(read_options); ASSERT_AND_ARROW_ASSIGN(auto table, res_reader->ToTable()); auto pk_chunk_arr = table->GetColumnByName("pk_field"); diff --git a/cpp/test/test_util.cpp b/cpp/test/test_util.cpp index 2caa692..e04b781 100644 --- a/cpp/test/test_util.cpp +++ b/cpp/test/test_util.cpp @@ -26,8 +26,11 @@ std::shared_ptr CreateArrowSchema(std::vector field_ return std::make_shared(fields); } -Status PrepareSimpleParquetFile(arrow::fs::FileSystem& fs, const std::string& file_path, int num_rows) { - auto schema = CreateArrowSchema({"f_int64"}, {arrow::int64()}); +Status PrepareSimpleParquetFile(std::shared_ptr schema, + arrow::fs::FileSystem& fs, + const std::string& file_path, + int num_rows) { + // TODO: parse schema and generate data ParquetFileWriter w(schema, fs, file_path); w.Init(); arrow::Int64Builder builder;