Skip to content

Commit

Permalink
Fix some read bugs and add a simple read test (#33)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jul 27, 2023
1 parent a7f850b commit c59da41
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 43 deletions.
9 changes: 6 additions & 3 deletions cpp/src/file/delete_fragment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ Result<DeleteFragment> DeleteFragment::Make(std::shared_ptr<arrow::fs::FileSyste
std::shared_ptr<Schema> schema,
const Fragment& fragment) {
DeleteFragment delete_fragment;
MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema());

auto opts = std::make_shared<ReadOptions>();
opts->columns = schema->delete_schema()->field_names();
MultiFilesSequentialReader rec_reader(fs, {fragment}, schema->delete_schema(), opts);
for (const auto& batch_rec : rec_reader) {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto batch, batch_rec);
delete_fragment.Add(batch);
Expand All @@ -42,13 +45,13 @@ Result<DeleteFragment> DeleteFragment::Make(std::shared_ptr<arrow::fs::FileSyste
return delete_fragment;
}

bool DeleteFragment::Filter(pk_type& pk, std::int64_t version) {
bool DeleteFragment::Filter(pk_type& pk, int64_t version, int64_t max_version) {
if (data_.find(pk) == data_.end()) {
return false;
}
std::vector<int64_t> versions = data_.at(pk);
for (auto i : versions) {
if (i >= version) {
if (i >= version && i <= max_version) {
return true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/file/delete_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DeleteFragment {
void set_id(int64_t id) { id_ = id; }

// Return true if this pk at this version have been deleted
bool Filter(pk_type& pk, std::int64_t version);
bool Filter(pk_type& pk, int64_t version, int64_t max_version = INT64_MAX);

// Return true if this pk have been deleted
bool Filter(pk_type& pk);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/filter/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Filter {
return Status::OK();
}

virtual ~Filter(){};

protected:
std::string column_name_;
};
Expand Down
45 changes: 44 additions & 1 deletion cpp/src/filter/value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ std::string Value::get_value() const {
return string_value_;
}

// TODO: add cast
bool Value::operator==(const Value& other) const { return TemplateBooleanOperation<Equal>(*this, other); }

bool Value::operator!=(const Value& other) const { return TemplateBooleanOperation<NotEqual>(*this, other); }
Expand All @@ -57,4 +58,46 @@ bool Value::operator<=(const Value& other) const { return TemplateBooleanOperati
bool Value::operator>(const Value& other) const { return TemplateBooleanOperation<GreaterThan>(*this, other); }

bool Value::operator<(const Value& other) const { return TemplateBooleanOperation<LessThan>(*this, other); }
} // namespace milvus_storage

Value Value::Bool(bool value) {
Value v;
v.type_ = BOOLEAN;
v.value_.bool_value_ = value;
return v;
}

Value Value::Int32(int32_t value) {
Value v;
v.type_ = INT32;
v.value_.int32_value_ = value;
return v;
}

Value Value::Int64(int64_t value) {
Value v;
v.type_ = INT64;
v.value_.int64_value_ = value;
return v;
}

Value Value::Float(float value) {
Value v;
v.type_ = FLOAT;
v.value_.float_value_ = value;
return v;
}

Value Value::Double(double value) {
Value v;
v.type_ = DOUBLE;
v.value_.double_value_ = value;
return v;
}

Value Value::String(std::string value) {
Value v;
v.type_ = STRING;
v.string_value_ = value;
return v;
}
} // namespace milvus_storage
24 changes: 17 additions & 7 deletions cpp/src/filter/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ enum LogicType {
};
class Value {
public:
Value(int32_t value) { value_.int32_value_ = value; } // NOLINT
Value(int64_t value) { value_.int64_value_ = value; } // NOLINT
Value(bool value) { value_.bool_value_ = value; } // NOLINT
Value(float value) { value_.float_value_ = value; } // NOLINT
Value(double value) { value_.double_value_ = value; } // NOLINT
explicit Value(LogicType type) : type_(type) {} // NOLINT
Value() = default;
Value(int32_t value) : type_(INT32) { value_.int32_value_ = value; } // NOLINT
Value(int64_t value) : type_(INT64) { value_.int64_value_ = value; } // NOLINT
Value(bool value) : type_(BOOLEAN) { value_.bool_value_ = value; } // NOLINT
Value(float value) : type_(FLOAT) { value_.float_value_ = value; } // NOLINT
Value(double value) : type_(DOUBLE) { value_.double_value_ = value; } // NOLINT
Value(std::string value) : type_(STRING) { string_value_ = value; } // NOLINT
explicit Value(LogicType type) : type_(type) {} // NOLINT

LogicType get_logic_type() const { return type_; }

Expand All @@ -36,6 +38,13 @@ class Value {
bool operator>(const Value& other) const;
bool operator<(const Value& other) const;

static Value Bool(bool value);
static Value Int32(int32_t value);
static Value Int64(int64_t value);
static Value Float(float value);
static Value Double(double value);
static Value String(std::string value);

private:
union Val {
bool bool_value_;
Expand Down Expand Up @@ -142,4 +151,5 @@ static bool TemplateBooleanOperation(const Value& a, const Value& b) {
}
return false;
}
} // namespace milvus_storage

} // namespace milvus_storage
12 changes: 7 additions & 5 deletions cpp/src/reader/common/delete_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
namespace milvus_storage {
std::shared_ptr<DeleteMergeReader> DeleteMergeReader::Make(std::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<SchemaOptions> schema_options,
const DeleteFragmentVector& delete_fragments) {
const DeleteFragmentVector& delete_fragments,
std::shared_ptr<ReadOptions> options) {
// DeleteFragmentVector filtered_delete_fragments;
// for (auto& delete_fragment : delete_fragments) {
// if (schema_options->has_version_column() || delete_fragment.id() > fragment_id) {
Expand All @@ -12,16 +13,16 @@ std::shared_ptr<DeleteMergeReader> DeleteMergeReader::Make(std::shared_ptr<arrow
// filtered_delete_fragments.push_back(delete_fragment);
// }
// }
return std::make_shared<DeleteMergeReader>(reader, delete_fragments, schema_options);
return std::make_shared<DeleteMergeReader>(reader, delete_fragments, schema_options, options);
}

std::shared_ptr<arrow::Schema> DeleteMergeReader::schema() const { return reader_->schema(); }

arrow::Status DeleteMergeReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) {
while (true) {
if (!filtered_batch_reader_) {
if (filtered_batch_reader_) {
auto b = filtered_batch_reader_->Next();
if (!b) {
if (b) {
*batch = b;
return arrow::Status::OK();
}
Expand All @@ -40,7 +41,8 @@ arrow::Status DeleteMergeReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* b
if (version_col == nullptr) {
return arrow::Status::Invalid("Version column not found");
}
auto visitor = DeleteFilterVisitor(delete_fragments_, std::static_pointer_cast<arrow::Int64Array>(version_col));
auto visitor = DeleteFilterVisitor(delete_fragments_, std::static_pointer_cast<arrow::Int64Array>(version_col),
options_->version);

auto pk_col = record_batch->GetColumnByName(schema_options_->primary_column);
if (pk_col == nullptr) {
Expand Down
19 changes: 13 additions & 6 deletions cpp/src/reader/common/delete_reader.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <memory>
#include <utility>
#include "arrow/record_batch.h"
#include "arrow/array/array_primitive.h"
Expand All @@ -17,23 +18,27 @@ class DeleteMergeReader : public arrow::RecordBatchReader {

static std::shared_ptr<DeleteMergeReader> Make(std::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<SchemaOptions> schema_options,
const DeleteFragmentVector& delete_fragments);
const DeleteFragmentVector& delete_fragments,
std::shared_ptr<ReadOptions> options);
std::shared_ptr<arrow::Schema> schema() const override;

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

DeleteMergeReader(std::shared_ptr<arrow::RecordBatchReader> reader,
DeleteFragmentVector delete_fragments,
std::shared_ptr<SchemaOptions> schema_options)
std::shared_ptr<SchemaOptions> schema_options,
std::shared_ptr<ReadOptions> options)
: reader_(std::move(reader)),
delete_fragments_(std::move(delete_fragments)),
schema_options_(std::move(schema_options)) {}
schema_options_(std::move(schema_options)),
options_(options) {}

private:
std::shared_ptr<arrow::RecordBatchReader> reader_;
std::shared_ptr<RecordBatchWithDeltedOffsets> filtered_batch_reader_;
DeleteFragmentVector delete_fragments_;
std::shared_ptr<SchemaOptions> schema_options_;
std::shared_ptr<ReadOptions> options_;
};

// RecordBatchWithDeltedOffsets is reader helper to fetch records not deleted without copy
Expand All @@ -54,8 +59,9 @@ class DeleteMergeReader::RecordBatchWithDeltedOffsets {
class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor {
public:
explicit DeleteFilterVisitor(DeleteFragmentVector delete_fragments,
std::shared_ptr<arrow::Int64Array> version_col = nullptr)
: version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)){};
std::shared_ptr<arrow::Int64Array> version_col = nullptr,
int64_t version = -1)
: version_col_(std::move(version_col)), delete_fragments_(std::move(delete_fragments)), version_(version){};

arrow::Status Visit(const arrow::Int64Array& array) override;
arrow::Status Visit(const arrow::StringArray& array) override;
Expand All @@ -69,7 +75,7 @@ class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor {
pk_type pk = array.Value(i);
for (auto& delete_fragment : delete_fragments_) {
if (version_col_ != nullptr) {
if (delete_fragment.Filter(pk, version_col_->Value(i))) {
if (delete_fragment.Filter(pk, version_col_->Value(i), version_)) {
offsets_.push_back(i);
break;
}
Expand All @@ -88,5 +94,6 @@ class DeleteMergeReader::DeleteFilterVisitor : public arrow::ArrayVisitor {
std::shared_ptr<arrow::Int64Array> version_col_;
DeleteFragmentVector delete_fragments_;
std::vector<int> offsets_;
int64_t version_;
};
} // namespace milvus_storage
4 changes: 3 additions & 1 deletion cpp/src/reader/common/filter_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <arrow/type_fwd.h>
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "common/log.h"

#include <memory>
#include <utility>
Expand All @@ -28,7 +29,7 @@ arrow::Status FilterReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* batch)
current_filtered_batch_reader_ = nullptr;
continue;
}
*batch = std::move(filtered_batch);
*batch = filtered_batch;
return arrow::Status::OK();
}
auto s = NextFilteredBatchReader();
Expand Down Expand Up @@ -85,6 +86,7 @@ arrow::Status FilterReader::NextFilteredBatchReader() {
if (!rec_batch) {
break;
}

filtered_batches = ApplyFilter(rec_batch, option_->filters);
} while (filtered_batches.empty());

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reader/filter_query_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Result<std::shared_ptr<arrow::RecordBatchReader>> FilterQueryRecordReader::MakeI
CombineOffsetReader::Make(scalar_rec_reader, current_vector_reader, schema_));
ASSIGN_OR_RETURN_NOT_OK(auto filter_reader, FilterReader::Make(combine_reader, options_));
std::shared_ptr<DeleteMergeReader> delete_reader =
DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_);
DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_, options_);
ASSIGN_OR_RETURN_NOT_OK(auto projection_reader, ProjectionReader::Make(schema_->schema(), delete_reader, options_));

next_pos_++;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/reader/merge_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ MergeRecordReader::MergeRecordReader(std::shared_ptr<ReadOptions> options,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::shared_ptr<Schema> schema)
: schema_(schema), fs_(fs), options_(options), delete_fragments_(delete_fragments) {
scalar_reader_ = std::make_shared<MultiFilesSequentialReader>(fs, scalar_fragments, schema->scalar_schema());
vector_reader_ = std::make_shared<MultiFilesSequentialReader>(fs, vector_fragments, schema->vector_schema());
scalar_reader_ = std::make_shared<MultiFilesSequentialReader>(fs, scalar_fragments, schema->scalar_schema(), options);
vector_reader_ = std::make_shared<MultiFilesSequentialReader>(fs, vector_fragments, schema->vector_schema(), options);
}

std::shared_ptr<arrow::Schema> MergeRecordReader::schema() const {
Expand Down Expand Up @@ -61,7 +61,7 @@ arrow::Status MergeRecordReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* b

Result<std::shared_ptr<arrow::RecordBatchReader>> MergeRecordReader::MakeInnerReader() {
ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader_, vector_reader_, schema_));
auto delete_reader = DeleteMergeReader::Make(combine_reader, schema_->options(), delete_fragments_);
auto delete_reader = DeleteMergeReader::Make(combine_reader, schema_->options(), delete_fragments_, options_);
ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema(), delete_reader, options_));
return res;
}
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/reader/multi_files_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ namespace milvus_storage {

MultiFilesSequentialReader::MultiFilesSequentialReader(std::shared_ptr<arrow::fs::FileSystem> fs,
const FragmentVector& fragments,
std::shared_ptr<arrow::Schema> schema)
: fs_(fs), schema_(std::move(schema)) {
std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<ReadOptions> options)
: fs_(fs), schema_(std::move(schema)), options_(options) {
for (const auto& fragment : fragments) {
files_.insert(files_.end(), fragment.files().begin(), fragment.files().end());
}
Expand All @@ -26,13 +27,13 @@ arrow::Status MultiFilesSequentialReader::ReadNext(std::shared_ptr<arrow::Record
return arrow::Status::OK();
}

auto s = MakeArrowFileReader(fs_, files_[next_pos_]);
auto s = MakeArrowFileReader(fs_, files_[next_pos_++]);
if (!s.ok()) {
return arrow::Status::UnknownError(s.status().ToString());
}
holding_file_reader_ = s.value();

auto s2 = MakeArrowRecordBatchReader(holding_file_reader_, ReadOptions::default_read_options());
auto s2 = MakeArrowRecordBatchReader(holding_file_reader_, *options_);
if (!s2.ok()) {
return arrow::Status::UnknownError(s2.status().ToString());
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/reader/multi_files_sequential_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader {
public:
MultiFilesSequentialReader(std::shared_ptr<arrow::fs::FileSystem> fs,
const FragmentVector& fragments,
std::shared_ptr<arrow::Schema> schema);
std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<ReadOptions> options);

std::shared_ptr<arrow::Schema> schema() const override;

Expand All @@ -24,10 +25,11 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader {
std::shared_ptr<arrow::Schema> schema_;
std::vector<std::string> files_;

size_t next_pos_;
size_t next_pos_ = 0;
std::shared_ptr<arrow::RecordBatchReader> curr_reader_;
std::shared_ptr<parquet::arrow::FileReader>
holding_file_reader_; // file reader have to outlive than record batch reader, so we hold here.
std::shared_ptr<ReadOptions> options_;

friend FilterQueryRecordReader;
};
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/reader/scan_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@ std::shared_ptr<arrow::Schema> ScanRecordReader::schema() const {
}
return r.value();
}
arrow::Status ScanRecordReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) {}
arrow::Status ScanRecordReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) {
if (reader_ == nullptr) {
auto res = MakeInnerReader();
if (!res.ok()) {
return arrow::Status::UnknownError(res.status().ToString());
}
reader_ = res.value();
}

return reader_->ReadNext(batch);
}

Result<std::shared_ptr<arrow::RecordBatchReader>> ScanRecordReader::MakeInnerReader() {
auto reader = std::make_shared<MultiFilesSequentialReader>(fs_, fragments_, schema_->schema());
auto reader = std::make_shared<MultiFilesSequentialReader>(fs_, fragments_, schema_->schema(), options_);
ASSIGN_OR_RETURN_NOT_OK(auto filter_reader, FilterReader::Make(reader, options_));
auto delete_reader = DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_);
auto delete_reader = DeleteMergeReader::Make(filter_reader, schema_->options(), delete_fragments_, options_);
ASSIGN_OR_RETURN_NOT_OK(auto res, ProjectionReader::Make(schema_->schema(), delete_reader, options_));
return res;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/storage/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct ReadOptions {
FilterSet filters;
std::vector<std::string> columns; // must have pk and version
int limit = -1;
int version = -1;
int64_t version = -1;

static ReadOptions& default_read_options() {
static ReadOptions options;
Expand Down
Loading

0 comments on commit c59da41

Please sign in to comment.