Skip to content

Commit

Permalink
[Cpp]: redesign interfaces and rewrite (#104)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 2, 2024
1 parent ebd0b8e commit 75720c3
Show file tree
Hide file tree
Showing 31 changed files with 163 additions and 195 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake/libopendal.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function(build_opendal)
CONFIGURE_COMMAND echo "configure for opendal_ep"
BUILD_COMMAND cargo build ${OPENDAL_BUILD_OPTS}
BUILD_IN_SOURCE 1
INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/")
INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/")


add_library(opendal STATIC IMPORTED)
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/common/arrow_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(std::sha

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader,
std::shared_ptr<ReadOptions> options = ReadOptions::default_read_options());
const ReadOptions& options = ReadOptions());
} // namespace milvus_storage
14 changes: 7 additions & 7 deletions cpp/include/milvus-storage/filter/conjunction_filter.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,27 +21,27 @@ namespace milvus_storage {

class ConjunctionOrFilter : public Filter {
public:
explicit ConjunctionOrFilter(std::vector<std::unique_ptr<Filter>>& filters, std::string column_name)
explicit ConjunctionOrFilter(const FilterSet& filters, std::string column_name)
: Filter(std::move(column_name)), filters_(filters) {}

bool CheckStatistics(parquet::Statistics* stats) override;

Status Apply(arrow::Array* col_data, filter_mask& bitset) override;

private:
std::vector<std::unique_ptr<Filter>>& filters_;
const FilterSet& filters_;
};

class ConjunctionAndFilter : public Filter {
public:
explicit ConjunctionAndFilter(std::vector<std::unique_ptr<Filter>>& filters, std::string column_name)
explicit ConjunctionAndFilter(const FilterSet& filters, std::string column_name)
: Filter(std::move(column_name)), filters_(filters) {}

bool CheckStatistics(parquet::Statistics* stats) override;

Status Apply(arrow::Array* col_data, filter_mask& bitset) override;

private:
std::vector<std::unique_ptr<Filter>>& filters_;
const FilterSet& filters_;
};
} // namespace milvus_storage
3 changes: 2 additions & 1 deletion cpp/include/milvus-storage/filter/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace milvus_storage {
using filter_mask = std::bitset<kReadBatchSize>;
class Filter {
public:
using FilterSet = std::vector<Filter*>;
explicit Filter(std::string column_name) : column_name_(std::move(column_name)) {}

virtual bool CheckStatistics(parquet::Statistics*) = 0;
Expand All @@ -36,7 +37,7 @@ class Filter {
virtual Status Apply(arrow::Array* col_data, filter_mask& bitset) = 0;

static Status ApplyFilter(const std::shared_ptr<arrow::RecordBatch>& record_batch,
std::vector<std::unique_ptr<Filter>>& filters,
const FilterSet& filters,
filter_mask& bitset) {
for (auto& filter : filters) {
auto col_data = record_batch->GetColumnByName(filter->get_column_name());
Expand Down
3 changes: 1 addition & 2 deletions cpp/include/milvus-storage/format/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ namespace milvus_storage {

class ParquetFileReader : public Reader {
public:
ParquetFileReader(std::shared_ptr<parquet::arrow::FileReader> reader, std::shared_ptr<ReadOptions>& options);
ParquetFileReader(std::shared_ptr<parquet::arrow::FileReader> reader);

void Close() override {}

Result<std::shared_ptr<arrow::Table>> ReadByOffsets(std::vector<int64_t>& offsets) override;

private:
std::shared_ptr<parquet::arrow::FileReader> reader_;
std::shared_ptr<ReadOptions> options_;
};
} // namespace milvus_storage
12 changes: 6 additions & 6 deletions cpp/include/milvus-storage/reader/common/delete_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -33,15 +33,15 @@ class DeleteMergeReader : public arrow::RecordBatchReader {
static std::shared_ptr<DeleteMergeReader> Make(std::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<SchemaOptions> schema_options,
const DeleteFragmentVector& delete_fragments,
std::shared_ptr<ReadOptions> options);
const ReadOptions& options);
std::shared_ptr<arrow::Schema> schema() const override;

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

DeleteMergeReader(std::shared_ptr<arrow::RecordBatchReader> reader,
DeleteFragmentVector delete_fragments,
std::shared_ptr<SchemaOptions> schema_options,
std::shared_ptr<ReadOptions> options)
const ReadOptions& options)
: reader_(std::move(reader)),
delete_fragments_(std::move(delete_fragments)),
schema_options_(std::move(schema_options)),
Expand All @@ -52,7 +52,7 @@ class DeleteMergeReader : public arrow::RecordBatchReader {
std::shared_ptr<RecordBatchWithDeltedOffsets> filtered_batch_reader_;
DeleteFragmentVector delete_fragments_;
std::shared_ptr<SchemaOptions> schema_options_;
std::shared_ptr<ReadOptions> options_;
const ReadOptions options_;
};

// RecordBatchWithDeltedOffsets is reader helper to fetch records not deleted without copy
Expand Down
8 changes: 4 additions & 4 deletions cpp/include/milvus-storage/reader/common/filter_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ class FilterReader : public arrow::RecordBatchReader {
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

static Result<std::shared_ptr<FilterReader>> Make(std::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<ReadOptions> option);
const ReadOptions& option);

FilterReader(std::shared_ptr<arrow::RecordBatchReader> reader, std::shared_ptr<ReadOptions> option)
: record_reader_(std::move(reader)), option_(std::move(option)) {}
FilterReader(std::shared_ptr<arrow::RecordBatchReader> reader, const ReadOptions& option)
: record_reader_(std::move(reader)), option_(option) {}

private:
arrow::Status NextFilteredBatchReader();

std::shared_ptr<arrow::RecordBatchReader> record_reader_;
std::shared_ptr<ReadOptions> option_;
const ReadOptions& option_;
std::shared_ptr<arrow::RecordBatchReader> current_filtered_batch_reader_;
};
} // namespace milvus_storage
6 changes: 3 additions & 3 deletions cpp/include/milvus-storage/reader/common/projection_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ class ProjectionReader : public arrow::RecordBatchReader {

static Result<std::shared_ptr<arrow::RecordBatchReader>> Make(std::shared_ptr<arrow::Schema> schema,
std ::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<ReadOptions> options);
const ReadOptions& options);

ProjectionReader(std::shared_ptr<arrow::Schema> schema,
std ::shared_ptr<arrow::RecordBatchReader> reader,
std::shared_ptr<ReadOptions> options);
const ReadOptions& options);

private:
std::shared_ptr<arrow::RecordBatchReader> reader_;
std::shared_ptr<ReadOptions> options_;
const ReadOptions options_;
std::shared_ptr<arrow::Schema> schema_;
};
} // namespace milvus_storage
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
#include <parquet/arrow/reader.h>
#include "file/delete_fragment.h"
#include "file/fragment.h"
#include "format/parquet/file_reader.h"
#include "reader/multi_files_sequential_reader.h"
#include "storage/space.h"
namespace milvus_storage {

class FilterQueryRecordReader : public arrow::RecordBatchReader {
public:
FilterQueryRecordReader(std::shared_ptr<ReadOptions> options,
FilterQueryRecordReader(const ReadOptions& options,
const FragmentVector& scalar_fragments,
const FragmentVector& vector_fragments,
const DeleteFragmentVector& delete_fragments,
Expand All @@ -42,7 +39,7 @@ class FilterQueryRecordReader : public arrow::RecordBatchReader {

std::shared_ptr<arrow::fs::FileSystem> fs_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<ReadOptions> options_;
const ReadOptions options_;
DeleteFragmentVector delete_fragments_;

std::vector<std::string> scalar_files_;
Expand Down
5 changes: 2 additions & 3 deletions cpp/include/milvus-storage/reader/merge_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "file/delete_fragment.h"
#include "file/fragment.h"
#include "storage/options.h"
#include "storage/space.h"
namespace milvus_storage {

// MergeRecordReader is to scan files to get records and merge them together.
Expand All @@ -27,7 +26,7 @@ namespace milvus_storage {
// \ FileReader(scalar)
class MergeRecordReader : public arrow::RecordBatchReader {
public:
explicit MergeRecordReader(std::shared_ptr<ReadOptions> options,
explicit MergeRecordReader(const ReadOptions& options,
const FragmentVector& scalar_fragments,
const FragmentVector& vector_fragments,
const DeleteFragmentVector& delete_fragments,
Expand All @@ -43,7 +42,7 @@ class MergeRecordReader : public arrow::RecordBatchReader {

std::shared_ptr<arrow::fs::FileSystem> fs_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<ReadOptions> options_;
const ReadOptions options_;

std::shared_ptr<arrow::RecordBatchReader> scalar_reader_;
std::shared_ptr<arrow::RecordBatchReader> vector_reader_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <parquet/arrow/reader.h>
#include "file/fragment.h"
#include "storage/space.h"
#include "reader/multi_files_sequential_reader.h"

namespace milvus_storage {

Expand All @@ -28,7 +27,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader {
MultiFilesSequentialReader(std::shared_ptr<arrow::fs::FileSystem> fs,
const FragmentVector& fragments,
std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<ReadOptions> options);
const ReadOptions& options);

std::shared_ptr<arrow::Schema> schema() const override;

Expand All @@ -43,7 +42,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader {
std::shared_ptr<arrow::RecordBatchReader> curr_reader_;
std::shared_ptr<parquet::arrow::FileReader>
holding_file_reader_; // file reader have to outlive than record batch reader, so we hold here.
std::shared_ptr<ReadOptions> options_;
const ReadOptions options_;

friend FilterQueryRecordReader;
};
Expand Down
37 changes: 18 additions & 19 deletions cpp/include/milvus-storage/reader/record_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,27 +15,26 @@
#pragma once

#include "file/delete_fragment.h"
#include "reader/filter_query_record_reader.h"
#include "storage/manifest.h"
namespace milvus_storage {

struct RecordReader {
static std::unique_ptr<arrow::RecordBatchReader> MakeRecordReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<Schema> schema,
std::shared_ptr<arrow::fs::FileSystem> fs,
DeleteFragmentVector delete_fragments,
std::shared_ptr<ReadOptions>& options);
namespace internal {
std::unique_ptr<arrow::RecordBatchReader> MakeRecordReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<Schema> schema,
std::shared_ptr<arrow::fs::FileSystem> fs,
DeleteFragmentVector delete_fragments,
const ReadOptions& options);

static bool only_contain_scalar_columns(std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns);
bool only_contain_scalar_columns(std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns);

static bool only_contain_vector_columns(std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns);
bool only_contain_vector_columns(std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns);

static bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema,
const std::vector<std::unique_ptr<Filter>>& filters);
bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema, const Filter::FilterSet& filters);

static Result<std::shared_ptr<arrow::RecordBatchReader>> MakeScanDataReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs);
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeScanDataReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);

static std::shared_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
};
std::shared_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
} // namespace internal
} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/reader/scan_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace milvus_storage {
class ScanRecordReader : public arrow::RecordBatchReader {
public:
ScanRecordReader(std::shared_ptr<Schema> schema,
std::shared_ptr<ReadOptions> options,
const ReadOptions& options,
std::shared_ptr<arrow::fs::FileSystem> fs,
const FragmentVector& fragments,
const DeleteFragmentVector& delete_fragments);
Expand All @@ -35,7 +35,7 @@ class ScanRecordReader : public arrow::RecordBatchReader {
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeInnerReader();

std::shared_ptr<Schema> schema_;
std::shared_ptr<ReadOptions> options_;
const ReadOptions options_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
const FragmentVector fragments_;
const DeleteFragmentVector delete_fragments_;
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/milvus-storage/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class Manifest {

[[nodiscard]] const FragmentVector& delete_fragments() const;

bool has_blob(std::string& name);
bool has_blob(const std::string& name);

void remove_blob_if_exist(std::string& name);
void remove_blob_if_exist(const std::string& name);

Result<Blob> get_blob(std::string& name);
Result<Blob> get_blob(const std::string& name);

[[nodiscard]] const std::vector<Blob>& blobs() const;

Expand Down
19 changes: 6 additions & 13 deletions cpp/include/milvus-storage/storage/options.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -14,6 +14,7 @@

#pragma once

#include <vector>
#include <memory>
#include "filter/filter.h"
#include "proto/manifest.pb.h"
Expand All @@ -30,20 +31,12 @@ struct WriteOption {
int64_t max_record_per_file = 1024;
};

using FilterSet = std::vector<std::unique_ptr<Filter>>;
struct ReadOptions {
FilterSet filters;
Filter::FilterSet filters;

std::vector<std::string> columns; // must have pk and version
// int limit = -1;
int64_t version = INT64_MAX;

static std::shared_ptr<ReadOptions> default_read_options() {
static std::shared_ptr<ReadOptions> options = std::make_shared<ReadOptions>();
return options;
}

std::vector<std::string> output_columns() { return columns; }
bool has_version() { return version != -1; }
};

struct SchemaOptions {
Expand Down
Loading

0 comments on commit 75720c3

Please sign in to comment.