diff --git a/cpp/src/file/delete_fragment.cpp b/cpp/src/file/delete_fragment.cpp index 1bf9d252..356be417 100644 --- a/cpp/src/file/delete_fragment.cpp +++ b/cpp/src/file/delete_fragment.cpp @@ -33,7 +33,10 @@ Result DeleteFragment::Make(std::shared_ptr schema, const Fragment& fragment) { DeleteFragment delete_fragment; - MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema()); + + auto opts = std::make_shared(); + opts->columns = schema->delete_schema()->field_names(); + MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema(), opts); for (const auto& batch_rec : rec_reader) { ASSIGN_OR_RETURN_ARROW_NOT_OK(auto batch, batch_rec); delete_fragment.Add(batch); @@ -42,13 +45,13 @@ Result DeleteFragment::Make(std::shared_ptr versions = data_.at(pk); for (auto i : versions) { - if (i >= version) { + if (i >= version && i <= max_version) { return true; } } diff --git a/cpp/src/file/delete_fragment.h b/cpp/src/file/delete_fragment.h index 735303a8..b396e567 100644 --- a/cpp/src/file/delete_fragment.h +++ b/cpp/src/file/delete_fragment.h @@ -25,7 +25,7 @@ class DeleteFragment { void set_id(int64_t id) { id_ = id; } // Return true if this pk at this version have been deleted - bool Filter(pk_type& pk, std::int64_t version); + bool Filter(pk_type& pk, int64_t version, int64_t max_version = INT64_MAX); // Return true if this pk have been deleted bool Filter(pk_type& pk); diff --git a/cpp/src/filter/filter.h b/cpp/src/filter/filter.h index 9ed55610..35fa5f73 100644 --- a/cpp/src/filter/filter.h +++ b/cpp/src/filter/filter.h @@ -31,6 +31,8 @@ class Filter { return Status::OK(); } + virtual ~Filter(){}; + protected: std::string column_name_; }; diff --git a/cpp/src/filter/value.cpp b/cpp/src/filter/value.cpp index 180d8bb7..9cb2b79e 100644 --- a/cpp/src/filter/value.cpp +++ b/cpp/src/filter/value.cpp @@ -44,6 +44,7 @@ std::string Value::get_value() const { return string_value_; } +// TODO: add cast bool Value::operator==(const Value& other) const { return TemplateBooleanOperation(*this, other); } bool Value::operator!=(const Value& other) const { return TemplateBooleanOperation(*this, other); } @@ -57,4 +58,46 @@ bool Value::operator<=(const Value& other) const { return TemplateBooleanOperati bool Value::operator>(const Value& other) const { return TemplateBooleanOperation(*this, other); } bool Value::operator<(const Value& other) const { return TemplateBooleanOperation(*this, other); } -} // namespace milvus_storage \ No newline at end of file + +Value Value::Bool(bool value) { + Value v; + v.type_ = BOOLEAN; + v.value_.bool_value_ = value; + return v; +} + +Value Value::Int32(int32_t value) { + Value v; + v.type_ = INT32; + v.value_.int32_value_ = value; + return v; +} + +Value Value::Int64(int64_t value) { + Value v; + v.type_ = INT64; + v.value_.int64_value_ = value; + return v; +} + +Value Value::Float(float value) { + Value v; + v.type_ = FLOAT; + v.value_.float_value_ = value; + return v; +} + +Value Value::Double(double value) { + Value v; + v.type_ = DOUBLE; + v.value_.double_value_ = value; + return v; +} + +Value Value::String(std::string value) { + Value v; + v.type_ = STRING; + v.string_value_ = value; + return v; +} +} // namespace milvus_storage diff --git a/cpp/src/filter/value.h b/cpp/src/filter/value.h index 13f633cb..dd13e3f8 100644 --- a/cpp/src/filter/value.h +++ b/cpp/src/filter/value.h @@ -17,12 +17,14 @@ enum LogicType { }; class Value { public: - Value(int32_t value) { value_.int32_value_ = value; } // NOLINT - Value(int64_t value) { value_.int64_value_ = value; } // NOLINT - Value(bool value) { value_.bool_value_ = value; } // NOLINT - Value(float value) { value_.float_value_ = value; } // NOLINT - Value(double value) { value_.double_value_ = value; } // NOLINT - explicit Value(LogicType type) : type_(type) {} // NOLINT + Value() = default; + Value(int32_t value) : type_(INT32) { value_.int32_value_ = value; } // NOLINT + Value(int64_t value) : type_(INT64) { value_.int64_value_ = value; } // NOLINT + Value(bool value) : type_(BOOLEAN) { value_.bool_value_ = value; } // NOLINT + Value(float value) : type_(FLOAT) { value_.float_value_ = value; } // NOLINT + Value(double value) : type_(DOUBLE) { value_.double_value_ = value; } // NOLINT + Value(std::string value) : type_(STRING) { string_value_ = value; } // NOLINT + explicit Value(LogicType type) : type_(type) {} // NOLINT LogicType get_logic_type() const { return type_; } @@ -36,6 +38,13 @@ class Value { bool operator>(const Value& other) const; bool operator<(const Value& other) const; + static Value Bool(bool value); + static Value Int32(int32_t value); + static Value Int64(int64_t value); + static Value Float(float value); + static Value Double(double value); + static Value String(std::string value); + private: union Val { bool bool_value_; @@ -142,4 +151,5 @@ static bool TemplateBooleanOperation(const Value& a, const Value& b) { } return false; } -} // namespace milvus_storage \ No newline at end of file + +} // namespace milvus_storage diff --git a/cpp/src/reader/common/delete_reader.cpp b/cpp/src/reader/common/delete_reader.cpp index b4c0cf5d..ccc8a302 100644 --- a/cpp/src/reader/common/delete_reader.cpp +++ b/cpp/src/reader/common/delete_reader.cpp @@ -3,7 +3,8 @@ namespace milvus_storage { std::shared_ptr DeleteMergeReader::Make(std::shared_ptr reader, std::shared_ptr schema_options, - const DeleteFragmentVector& delete_fragments) { + const DeleteFragmentVector& delete_fragments, + std::shared_ptr options) { // DeleteFragmentVector filtered_delete_fragments; // for (auto& delete_fragment : delete_fragments) { // if (schema_options->has_version_column() || delete_fragment.id() > fragment_id) { @@ -12,16 +13,16 @@ std::shared_ptr DeleteMergeReader::Make(std::shared_ptr(reader, delete_fragments, schema_options); + return std::make_shared(reader, delete_fragments, schema_options, options); } std::shared_ptr DeleteMergeReader::schema() const { return reader_->schema(); } arrow::Status DeleteMergeReader::ReadNext(std::shared_ptr* batch) { while (true) { - if (!filtered_batch_reader_) { + if (filtered_batch_reader_) { auto b = filtered_batch_reader_->Next(); - if (!b) { + if (b) { *batch = b; return arrow::Status::OK(); } @@ -40,7 +41,8 @@ arrow::Status DeleteMergeReader::ReadNext(std::shared_ptr* b if (version_col == nullptr) { return arrow::Status::Invalid("Version column not found"); } - auto visitor = DeleteFilterVisitor(delete_fragments_, std::static_pointer_cast(version_col)); + auto visitor = DeleteFilterVisitor(delete_fragments_, std::static_pointer_cast(version_col), + options_->version); auto pk_col = record_batch->GetColumnByName(schema_options_->primary_column); if (pk_col == nullptr) { diff --git a/cpp/src/reader/common/delete_reader.h b/cpp/src/reader/common/delete_reader.h index 926a3a5d..2ce63651 100644 --- a/cpp/src/reader/common/delete_reader.h +++ b/cpp/src/reader/common/delete_reader.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include "arrow/record_batch.h" #include "arrow/array/array_primitive.h" @@ -17,23 +18,27 @@ class DeleteMergeReader : public arrow::RecordBatchReader { static std::shared_ptr Make(std::shared_ptr reader, std::shared_ptr schema_options, - const DeleteFragmentVector& delete_fragments); + const DeleteFragmentVector& delete_fragments, + std::shared_ptr options); std::shared_ptr schema() const override; arrow::Status ReadNext(std::shared_ptr* batch) override; DeleteMergeReader(std::shared_ptr reader, DeleteFragmentVector delete_fragments, - std::shared_ptr schema_options) + std::shared_ptr schema_options, + std::shared_ptr options) : reader_(std::move(reader)), delete_fragments_(std::move(delete_fragments)), - schema_options_(std::move(schema_options)) {} + schema_options_(std::move(schema_options)), + options_(options) {} private: std::shared_ptr reader_; std::shared_ptr filtered_batch_reader_; DeleteFragmentVector delete_fragments_; std::shared_ptr schema_options_; + std::shared_ptr options_; }; // RecordBatchWithDeltedOffsets is reader helper to fetch records not deleted without copy @@ -54,8 +59,9 @@ class DeleteMergeReader::RecordBatchWithDeltedOffsets { class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor { public: explicit DeleteFilterVisitor(DeleteFragmentVector delete_fragments, - std::shared_ptr version_col = nullptr) - : version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)){}; + std::shared_ptr version_col = nullptr, + int64_t version = -1) + : version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)), version_(version){}; arrow::Status Visit(const arrow::Int64Array& array) override; arrow::Status Visit(const arrow::StringArray& array) override; @@ -69,7 +75,7 @@ class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor { pk_type pk = array.Value(i); for (auto& delete_fragment : delete_fragments_) { if (version_col_ != nullptr) { - if (delete_fragment.Filter(pk, version_col_->Value(i))) { + if (delete_fragment.Filter(pk, version_col_->Value(i), version_)) { offsets_.push_back(i); break; } @@ -88,5 +94,6 @@ class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor { std::shared_ptr version_col_; DeleteFragmentVector delete_fragments_; std::vector offsets_; + int64_t version_; }; } // namespace milvus_storage diff --git a/cpp/src/reader/common/filter_reader.cpp b/cpp/src/reader/common/filter_reader.cpp index fe9ed2db..f220e986 100644 --- a/cpp/src/reader/common/filter_reader.cpp +++ b/cpp/src/reader/common/filter_reader.cpp @@ -2,6 +2,7 @@ #include #include "arrow/record_batch.h" #include "arrow/table.h" +#include "common/log.h" #include #include @@ -28,7 +29,7 @@ arrow::Status FilterReader::ReadNext(std::shared_ptr* batch) current_filtered_batch_reader_ = nullptr; continue; } - *batch = std::move(filtered_batch); + *batch = filtered_batch; return arrow::Status::OK(); } auto s = NextFilteredBatchReader(); @@ -85,6 +86,7 @@ arrow::Status FilterReader::NextFilteredBatchReader() { if (!rec_batch) { break; } + filtered_batches = ApplyFilter(rec_batch, option_->filters); } while (filtered_batches.empty()); diff --git a/cpp/src/reader/filter_query_record_reader.cpp b/cpp/src/reader/filter_query_record_reader.cpp index a77d2ec9..07c13126 100644 --- a/cpp/src/reader/filter_query_record_reader.cpp +++ b/cpp/src/reader/filter_query_record_reader.cpp @@ -89,7 +89,7 @@ Result> FilterQueryRecordReader::MakeI CombineOffsetReader::Make(scalar_rec_reader, current_vector_reader, schema_)); ASSIGN_OR_RETURN_NOT_OK(auto filter_reader, FilterReader::Make(combine_reader, options_)); std::shared_ptr delete_reader = - DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_); + DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_, options_); ASSIGN_OR_RETURN_NOT_OK(auto projection_reader, ProjectionReader::Make(schema_->schema(), delete_reader, options_)); next_pos_++; diff --git a/cpp/src/reader/merge_record_reader.cpp b/cpp/src/reader/merge_record_reader.cpp index e8b7188e..ea99a6bf 100644 --- a/cpp/src/reader/merge_record_reader.cpp +++ b/cpp/src/reader/merge_record_reader.cpp @@ -23,8 +23,8 @@ MergeRecordReader::MergeRecordReader(std::shared_ptr options, std::shared_ptr fs, std::shared_ptr schema) : schema_(schema), fs_(fs), options_(options), delete_fragments_(delete_fragments) { - scalar_reader_ = std::make_shared(fs, scalar_fragments, schema->scalar_schema()); - vector_reader_ = std::make_shared(fs, vector_fragments, schema->vector_schema()); + scalar_reader_ = std::make_shared(fs, scalar_fragments, schema->scalar_schema(), options); + vector_reader_ = std::make_shared(fs, vector_fragments, schema->vector_schema(), options); } std::shared_ptr MergeRecordReader::schema() const { @@ -61,7 +61,7 @@ arrow::Status MergeRecordReader::ReadNext(std::shared_ptr* b Result> MergeRecordReader::MakeInnerReader() { ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader_, vector_reader_, schema_)); - auto delete_reader = DeleteMergeReader::Make(combine_reader, schema_->options(), delete_fragments_); + auto delete_reader = DeleteMergeReader::Make(combine_reader, schema_->options(), delete_fragments_, options_); ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema(), delete_reader, options_)); return res; } diff --git a/cpp/src/reader/multi_files_sequential_reader.cpp b/cpp/src/reader/multi_files_sequential_reader.cpp index aa2121a1..bffa143f 100644 --- a/cpp/src/reader/multi_files_sequential_reader.cpp +++ b/cpp/src/reader/multi_files_sequential_reader.cpp @@ -9,8 +9,9 @@ namespace milvus_storage { MultiFilesSequentialReader::MultiFilesSequentialReader(std::shared_ptr fs, const FragmentVector& fragments, - std::shared_ptr schema) - : fs_(fs), schema_(std::move(schema)) { + std::shared_ptr schema, + std::shared_ptr options) + : fs_(fs), schema_(std::move(schema)), options_(options) { for (const auto& fragment : fragments) { files_.insert(files_.end(), fragment.files().begin(), fragment.files().end()); } @@ -26,13 +27,13 @@ arrow::Status MultiFilesSequentialReader::ReadNext(std::shared_ptr fs, const FragmentVector& fragments, - std::shared_ptr schema); + std::shared_ptr schema, + std::shared_ptr options); std::shared_ptr schema() const override; @@ -24,10 +25,11 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader { std::shared_ptr schema_; std::vector files_; - size_t next_pos_; + size_t next_pos_ = 0; std::shared_ptr curr_reader_; std::shared_ptr holding_file_reader_; // file reader have to outlive than record batch reader, so we hold here. + std::shared_ptr options_; friend FilterQueryRecordReader; }; diff --git a/cpp/src/reader/scan_record_reader.cpp b/cpp/src/reader/scan_record_reader.cpp index d2edcdca..34742ffb 100644 --- a/cpp/src/reader/scan_record_reader.cpp +++ b/cpp/src/reader/scan_record_reader.cpp @@ -23,12 +23,22 @@ std::shared_ptr ScanRecordReader::schema() const { } return r.value(); } -arrow::Status ScanRecordReader::ReadNext(std::shared_ptr* batch) {} +arrow::Status ScanRecordReader::ReadNext(std::shared_ptr* batch) { + if (reader_ == nullptr) { + auto res = MakeInnerReader(); + if (!res.ok()) { + return arrow::Status::UnknownError(res.status().ToString()); + } + reader_ = res.value(); + } + + return reader_->ReadNext(batch); +} Result> ScanRecordReader::MakeInnerReader() { - auto reader = std::make_shared(fs_, fragments_, schema_->schema()); + auto reader = std::make_shared(fs_, fragments_, schema_->schema(), options_); ASSIGN_OR_RETURN_NOT_OK(auto filter_reader, FilterReader::Make(reader, options_)); - auto delete_reader = DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_); + auto delete_reader = DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_, options_); ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema_->schema(), delete_reader, options_)); return res; } diff --git a/cpp/src/storage/options.h b/cpp/src/storage/options.h index 1fc4ae97..9ff39420 100644 --- a/cpp/src/storage/options.h +++ b/cpp/src/storage/options.h @@ -20,7 +20,7 @@ struct ReadOptions { FilterSet filters; std::vector columns; // must have pk and version int limit = -1; - int version = -1; + int64_t version = -1; static ReadOptions& default_read_options() { static ReadOptions options; diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index f828d504..68c9f57d 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -165,10 +166,10 @@ Status Space::Delete(arrow::RecordBatchReader* reader) { } std::unique_ptr Space::Read(std::shared_ptr option) { - if (option->has_version()) { - assert(manifest_->schema()->options()->has_version_column()); - option->filters.push_back(std::make_unique( - ComparisonType::GREATER_EQUAL, manifest_->schema()->options()->version_column, option->version)); + if (manifest_->schema()->options()->has_version_column()) { + option->filters.push_back(std::make_unique(ComparisonType::LESS_EQUAL, + manifest_->schema()->options()->version_column, + option->has_version() ? option->version : INT64_MAX)); } // TODO: remove second argument return RecordReader::MakeRecordReader(manifest_, manifest_->schema(), fs_, delete_fragments_, option); diff --git a/cpp/src/storage/space.h b/cpp/src/storage/space.h index 42292e03..078885bf 100644 --- a/cpp/src/storage/space.h +++ b/cpp/src/storage/space.h @@ -1,7 +1,6 @@ #pragma once #include #include -#include "arrow/filesystem/filesystem.h" #include "storage/manifest.h" #include "storage/space.h" diff --git a/cpp/test/space_test.cpp b/cpp/test/space_test.cpp index f05a5dc3..fe4e8031 100644 --- a/cpp/test/space_test.cpp +++ b/cpp/test/space_test.cpp @@ -3,6 +3,7 @@ #include "filter/constant_filter.h" #include +#include #include #include #include @@ -10,6 +11,7 @@ #include #include #include "test_util.h" +#include "arrow/table.h" namespace milvus_storage { /** @@ -57,6 +59,19 @@ TEST(SpaceTest, SpaceWriteReadTest) { auto write_option = WriteOption{10}; space->Write(reader.get(), &write_option); + + std::unique_ptr filter = std::make_unique(EQUAL, "pk_field", Value::Int64(1)); + auto read_options = std::make_shared(); + read_options->filters.push_back(std::move(filter)); + read_options->columns.emplace_back("pk_field"); + auto res_reader = space->Read(read_options); + ASSERT_AND_ARROW_ASSIGN(auto table, res_reader->ToTable()); + auto pk_chunk_arr = table->GetColumnByName("pk_field"); + ASSERT_EQ(pk_chunk_arr->length(), 1); + auto pk_chunk = pk_chunk_arr->chunk(0); + ASSERT_EQ(pk_chunk->length(), 1); + auto pk_arr = dynamic_cast(pk_chunk.get()); + ASSERT_EQ(1, pk_arr->Value(0)); } /** * @brief Test Space::Read diff --git a/cpp/test/test_util.h b/cpp/test/test_util.h index f9454307..1f7b3441 100644 --- a/cpp/test/test_util.h +++ b/cpp/test/test_util.h @@ -23,6 +23,12 @@ namespace milvus_storage { #define ASSERT_AND_ASSIGN(lhs, rexpr) ASSERT_AND_ASSIGN_IMPL(CONCAT(_tmp_value, __COUNTER__), lhs, rexpr); +#define ASSERT_AND_ARROW_ASSIGN_IMPL(status_name, lhs, rexpr) \ + auto status_name = (rexpr); \ + ASSERT_STATUS_OK(status_name.status()); \ + lhs = std::move(status_name).ValueUnsafe(); + +#define ASSERT_AND_ARROW_ASSIGN(lhs, rexpr) ASSERT_AND_ARROW_ASSIGN_IMPL(CONCAT(_tmp_value, __COUNTER__), lhs, rexpr); std::shared_ptr CreateArrowSchema(std::vector field_names, std::vector> field_types); } // namespace milvus_storage