Skip to content

Commit

Permalink
[Cpp]: change reader to unique ptr (#108)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 3, 2024
1 parent 3c84114 commit 4440a90
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 101 deletions.
13 changes: 6 additions & 7 deletions cpp/include/milvus-storage/common/arrow_util.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,10 +20,9 @@
#include "storage/options.h"

namespace milvus_storage {
Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(arrow::fs::FileSystem& fs,
Result<std::unique_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(arrow::fs::FileSystem& fs,
const std::string& file_path);

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader,
const ReadOptions& options = ReadOptions());
Result<std::unique_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
parquet::arrow::FileReader& reader, const ReadOptions& options = ReadOptions());
} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/format/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ namespace milvus_storage {

class ParquetFileReader : public Reader {
public:
ParquetFileReader(std::shared_ptr<parquet::arrow::FileReader> reader);
ParquetFileReader(std::unique_ptr<parquet::arrow::FileReader> reader);

void Close() override {}

Result<std::shared_ptr<arrow::Table>> ReadByOffsets(std::vector<int64_t>& offsets) override;

private:
std::shared_ptr<parquet::arrow::FileReader> reader_;
std::unique_ptr<parquet::arrow::FileReader> reader_;
};
} // namespace milvus_storage
12 changes: 6 additions & 6 deletions cpp/include/milvus-storage/reader/common/combine_offset_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -24,9 +24,9 @@ namespace milvus_storage {
// of another file and combines them together.
class CombineOffsetReader : public arrow::RecordBatchReader {
public:
static Result<std::shared_ptr<CombineOffsetReader>> Make(std::shared_ptr<arrow::RecordBatchReader> scalar_reader,
std::shared_ptr<ParquetFileReader> vector_reader,
std::shared_ptr<Schema> schema);
static std::unique_ptr<CombineOffsetReader> Make(std::unique_ptr<arrow::RecordBatchReader> scalar_reader,
std::unique_ptr<ParquetFileReader> vector_reader,
std::shared_ptr<Schema> schema);

std::shared_ptr<arrow::Schema> schema() const override;

Expand Down
6 changes: 3 additions & 3 deletions cpp/include/milvus-storage/reader/common/delete_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ class DeleteMergeReader : public arrow::RecordBatchReader {
class RecordBatchWithDeltedOffsets;
class DeleteFilterVisitor;

static std::shared_ptr<DeleteMergeReader> Make(std::shared_ptr<arrow::RecordBatchReader> reader,
static std::unique_ptr<DeleteMergeReader> Make(std::unique_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<SchemaOptions> schema_options,
const DeleteFragmentVector& delete_fragments,
const ReadOptions& options);
std::shared_ptr<arrow::Schema> schema() const override;

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

DeleteMergeReader(std::shared_ptr<arrow::RecordBatchReader> reader,
DeleteMergeReader(std::unique_ptr<arrow::RecordBatchReader> reader,
DeleteFragmentVector delete_fragments,
std::shared_ptr<SchemaOptions> schema_options,
const ReadOptions& options)
Expand All @@ -48,7 +48,7 @@ class DeleteMergeReader : public arrow::RecordBatchReader {
options_(options) {}

private:
std::shared_ptr<arrow::RecordBatchReader> reader_;
std::unique_ptr<arrow::RecordBatchReader> reader_;
std::shared_ptr<RecordBatchWithDeltedOffsets> filtered_batch_reader_;
DeleteFragmentVector delete_fragments_;
std::shared_ptr<SchemaOptions> schema_options_;
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/milvus-storage/reader/common/filter_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ class FilterReader : public arrow::RecordBatchReader {

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

static Result<std::shared_ptr<FilterReader>> Make(std::shared_ptr<arrow::RecordBatchReader> reader,
static std::unique_ptr<FilterReader> Make(std::unique_ptr<arrow::RecordBatchReader> reader,
const ReadOptions& option);

FilterReader(std::shared_ptr<arrow::RecordBatchReader> reader, const ReadOptions& option)
FilterReader(std::unique_ptr<arrow::RecordBatchReader> reader, const ReadOptions& option)
: record_reader_(std::move(reader)), option_(option) {}

private:
arrow::Status NextFilteredBatchReader();

std::shared_ptr<arrow::RecordBatchReader> record_reader_;
std::unique_ptr<arrow::RecordBatchReader> record_reader_;
const ReadOptions& option_;
std::shared_ptr<arrow::RecordBatchReader> current_filtered_batch_reader_;
};
Expand Down
14 changes: 7 additions & 7 deletions cpp/include/milvus-storage/reader/common/projection_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -26,16 +26,16 @@ class ProjectionReader : public arrow::RecordBatchReader {

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

static Result<std::shared_ptr<arrow::RecordBatchReader>> Make(std::shared_ptr<arrow::Schema> schema,
std ::shared_ptr<arrow::RecordBatchReader> reader,
static Result<std::unique_ptr<arrow::RecordBatchReader>> Make(std::shared_ptr<arrow::Schema> schema,
std::unique_ptr<arrow::RecordBatchReader> reader,
const ReadOptions& options);

ProjectionReader(std::shared_ptr<arrow::Schema> schema,
std ::shared_ptr<arrow::RecordBatchReader> reader,
std::unique_ptr<arrow::RecordBatchReader> reader,
const ReadOptions& options);

private:
std::shared_ptr<arrow::RecordBatchReader> reader_;
std::unique_ptr<arrow::RecordBatchReader> reader_;
const ReadOptions options_;
std::shared_ptr<arrow::Schema> schema_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FilterQueryRecordReader : public arrow::RecordBatchReader {

private:
// Try to make inner reader, return nullptr if next_pos_ reach the end.
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeInnerReader();
Result<std::unique_ptr<arrow::RecordBatchReader>> MakeInnerReader();

arrow::fs::FileSystem& fs_;
std::shared_ptr<Schema> schema_;
Expand All @@ -46,9 +46,9 @@ class FilterQueryRecordReader : public arrow::RecordBatchReader {
std::vector<std::string> vector_files_;
int64_t next_pos_ = 0;

std::shared_ptr<parquet::arrow::FileReader> holding_scalar_file_reader_;
std::shared_ptr<parquet::arrow::FileReader> holding_vector_file_reader_;
std::unique_ptr<parquet::arrow::FileReader> holding_scalar_file_reader_;
std::unique_ptr<parquet::arrow::FileReader> holding_vector_file_reader_;

std::shared_ptr<arrow::RecordBatchReader> curr_reader_;
std::unique_ptr<arrow::RecordBatchReader> curr_reader_;
};
} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/reader/merge_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ class MergeRecordReader : public arrow::RecordBatchReader {
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

private:
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeInnerReader();
Result<std::unique_ptr<arrow::RecordBatchReader>> MakeInnerReader();

arrow::fs::FileSystem& fs_;
std::shared_ptr<Schema> schema_;
const ReadOptions options_;

std::unique_ptr<arrow::RecordBatchReader> scalar_reader_;
std::unique_ptr<arrow::RecordBatchReader> vector_reader_;
std::shared_ptr<arrow::RecordBatchReader> curr_reader_;
std::unique_ptr<arrow::RecordBatchReader> curr_reader_;
const DeleteFragmentVector delete_fragments_;
};
} // namespace milvus_storage
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader {
std::vector<std::string> files_;

size_t next_pos_ = 0;
std::shared_ptr<arrow::RecordBatchReader> curr_reader_;
std::shared_ptr<parquet::arrow::FileReader>
std::unique_ptr<arrow::RecordBatchReader> curr_reader_;
std::unique_ptr<parquet::arrow::FileReader>
holding_file_reader_; // file reader have to outlive than record batch reader, so we hold here.
const ReadOptions options_;

Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/reader/scan_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class ScanRecordReader : public arrow::RecordBatchReader {
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

private:
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeInnerReader();
Result<std::unique_ptr<arrow::RecordBatchReader>> MakeInnerReader();

std::shared_ptr<Schema> schema_;
const ReadOptions options_;
arrow::fs::FileSystem& fs_;
const FragmentVector fragments_;
const DeleteFragmentVector delete_fragments_;
std::shared_ptr<arrow::RecordBatchReader> reader_;
std::unique_ptr<arrow::RecordBatchReader> reader_;
};

} // namespace milvus_storage
14 changes: 7 additions & 7 deletions cpp/src/common/arrow_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
#include "common/macro.h"

namespace milvus_storage {
Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(arrow::fs::FileSystem& fs,
Result<std::unique_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(arrow::fs::FileSystem& fs,
const std::string& file_path) {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs.OpenInputFile(file_path));

std::unique_ptr<parquet::arrow::FileReader> file_reader;
RETURN_ARROW_NOT_OK(parquet::arrow::OpenFile(file, arrow::default_memory_pool(), &file_reader));
return std::shared_ptr(std::move(file_reader));
return std::move(file_reader);
}

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader, const ReadOptions& options) {
auto metadata = reader->parquet_reader()->metadata();
Result<std::unique_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
parquet::arrow::FileReader& reader, const ReadOptions& options) {
auto metadata = reader.parquet_reader()->metadata();
std::vector<int> row_group_indices;
std::vector<int> column_indices;

Expand Down Expand Up @@ -61,10 +61,10 @@ Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
row_group_indices.emplace_back(i);
}
}
std::shared_ptr<arrow::RecordBatchReader> record_reader;
std::unique_ptr<arrow::RecordBatchReader> record_reader;

// RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, column_indices, &record_reader));
RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, &record_reader));
RETURN_ARROW_NOT_OK(reader.GetRecordBatchReader(row_group_indices, &record_reader));
return record_reader;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/format/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

namespace milvus_storage {

ParquetFileReader::ParquetFileReader(std::shared_ptr<parquet::arrow::FileReader> reader)
ParquetFileReader::ParquetFileReader(std::unique_ptr<parquet::arrow::FileReader> reader)
: reader_(std::move(reader)) {}

Result<std::shared_ptr<arrow::RecordBatch>> GetRecordAtOffset(arrow::RecordBatchReader* reader, int64_t offset) {
Expand Down
16 changes: 7 additions & 9 deletions cpp/src/reader/common/combine_offset_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,12 +15,10 @@
#include "reader/common/combine_offset_reader.h"
#include "arrow/array/array_primitive.h"
namespace milvus_storage {
Result<std::shared_ptr<CombineOffsetReader>> CombineOffsetReader::Make(
std::shared_ptr<arrow::RecordBatchReader> scalar_reader,
std::shared_ptr<ParquetFileReader> vector_reader,
std::shared_ptr<Schema> schema) {
return std::shared_ptr<CombineOffsetReader>(
new CombineOffsetReader(std::move(scalar_reader), std::move(vector_reader), std::move(schema)));
std::unique_ptr<CombineOffsetReader> CombineOffsetReader::Make(std::unique_ptr<arrow::RecordBatchReader> scalar_reader,
std::unique_ptr<ParquetFileReader> vector_reader,
std::shared_ptr<Schema> schema) {
return std::make_unique<CombineOffsetReader>(std::move(scalar_reader), std::move(vector_reader), std::move(schema));
}

std::shared_ptr<arrow::Schema> CombineOffsetReader::schema() const { return schema_->schema(); }
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/reader/common/delete_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "reader/common/delete_reader.h"

namespace milvus_storage {
std::shared_ptr<DeleteMergeReader> DeleteMergeReader::Make(std::shared_ptr<arrow::RecordBatchReader> reader,
std::unique_ptr<DeleteMergeReader> DeleteMergeReader::Make(std::unique_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<SchemaOptions> schema_options,
const DeleteFragmentVector& delete_fragments,
const ReadOptions& options) {
Expand All @@ -27,7 +27,7 @@ std::shared_ptr<DeleteMergeReader> DeleteMergeReader::Make(std::shared_ptr<arrow
// filtered_delete_fragments.push_back(delete_fragment);
// }
// }
return std::make_shared<DeleteMergeReader>(reader, delete_fragments, schema_options, options);
return std::make_unique<DeleteMergeReader>(std::move(reader), delete_fragments, schema_options, options);
}

std::shared_ptr<arrow::Schema> DeleteMergeReader::schema() const { return reader_->schema(); }
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/reader/common/filter_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <memory>

namespace milvus_storage {
Result<std::shared_ptr<FilterReader>> FilterReader::Make(std::shared_ptr<arrow::RecordBatchReader> reader,
std::unique_ptr<FilterReader> FilterReader::Make(std::unique_ptr<arrow::RecordBatchReader> reader,
const ReadOptions& option) {
return std::make_shared<FilterReader>(reader, option);
return std::make_unique<FilterReader>(std::move(reader), option);
}

std::shared_ptr<arrow::Schema> FilterReader::schema() const {
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/reader/common/projection_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -23,17 +23,17 @@
namespace milvus_storage {

ProjectionReader::ProjectionReader(std::shared_ptr<arrow::Schema> schema,
std ::shared_ptr<arrow::RecordBatchReader> reader,
std ::unique_ptr<arrow::RecordBatchReader> reader,
const ReadOptions& options)
: reader_(std::move(reader)), options_(options), schema_(schema) {}

Result<std::shared_ptr<arrow::RecordBatchReader>> ProjectionReader::Make(
Result<std::unique_ptr<arrow::RecordBatchReader>> ProjectionReader::Make(
std::shared_ptr<arrow::Schema> schema,
std ::shared_ptr<arrow::RecordBatchReader> reader,
std ::unique_ptr<arrow::RecordBatchReader> reader,
const ReadOptions& options) {
ASSIGN_OR_RETURN_NOT_OK(auto projection_schema, ProjectSchema(schema, options.columns));
std::shared_ptr<arrow::RecordBatchReader> projection_reader =
std::make_shared<ProjectionReader>(projection_schema, reader, options);
std::unique_ptr<arrow::RecordBatchReader> projection_reader =
std::make_unique<ProjectionReader>(projection_schema, std::move(reader), options);
return projection_reader;
}

Expand Down
Loading

0 comments on commit 4440a90

Please sign in to comment.