Skip to content

Commit

Permalink
Add scan interfaces (#49)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 5, 2023
1 parent 9875aa9 commit fb6014c
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 15 deletions.
6 changes: 6 additions & 0 deletions cpp/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class StorageConan(ConanFile):
"arrow:filesystem_layer": True,
"arrow:dataset_modules": True,
"arrow:parquet": True,
"arrow:with_re2": True,
"arrow:with_zstd": True,
"arrow:with_boost": True,
"arrow:with_thrift": True,
"arrow:with_jemalloc": True,
"boost:without_test": True,
}

exports_sources = (
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/common/arrow_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(std::sha
}

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader, const ReadOptions& options) {
std::shared_ptr<parquet::arrow::FileReader> reader, std::shared_ptr<ReadOptions> options) {
auto metadata = reader->parquet_reader()->metadata();
std::vector<int> row_group_indices;
std::vector<int> column_indices;

for (const auto& column_name : options.columns) {
for (const auto& column_name : options->columns) {
auto column_idx = metadata->schema()->ColumnIndex(column_name);
column_indices.emplace_back(column_idx);
}
for (const auto& filter : options.filters) {
for (const auto& filter : options->filters) {
auto column_idx = metadata->schema()->ColumnIndex(filter->get_column_name());
column_indices.emplace_back(column_idx);
}
Expand All @@ -30,7 +30,7 @@ Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
auto row_group_metadata = metadata->RowGroup(i);
bool can_ignored = false;

for (const auto& filter : options.filters) {
for (const auto& filter : options->filters) {
auto column_idx = metadata->schema()->ColumnIndex(filter->get_column_name());
auto column_meta = row_group_metadata->ColumnChunk(column_idx);
auto stats = column_meta->statistics();
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/common/arrow_util.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <memory>
#include "parquet/arrow/reader.h"
#include "arrow/filesystem/filesystem.h"
#include "common/result.h"
Expand All @@ -10,5 +11,5 @@ Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(std::sha

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader,
const ReadOptions& options = ReadOptions::default_read_options());
std::shared_ptr<ReadOptions> options = ReadOptions::default_read_options());
} // namespace milvus_storage
2 changes: 1 addition & 1 deletion cpp/src/reader/filter_query_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Result<std::shared_ptr<arrow::RecordBatchReader>> FilterQueryRecordReader::MakeI
auto scalar_file = scalar_files_[next_pos_], vector_file = vector_files_[next_pos_];
ASSIGN_OR_RETURN_NOT_OK(holding_scalar_file_reader_, MakeArrowFileReader(fs_, scalar_file));
ASSIGN_OR_RETURN_NOT_OK(holding_vector_file_reader_, MakeArrowFileReader(fs_, vector_file));
ASSIGN_OR_RETURN_NOT_OK(auto scalar_rec_reader, MakeArrowRecordBatchReader(holding_scalar_file_reader_, *options_));
ASSIGN_OR_RETURN_NOT_OK(auto scalar_rec_reader, MakeArrowRecordBatchReader(holding_scalar_file_reader_, options_));
auto current_vector_reader = std::make_shared<ParquetFileReader>(holding_vector_file_reader_, options_);

ASSIGN_OR_RETURN_NOT_OK(auto combine_reader,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reader/multi_files_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ arrow::Status MultiFilesSequentialReader::ReadNext(std::shared_ptr<arrow::Record
}
holding_file_reader_ = s.value();

auto s2 = MakeArrowRecordBatchReader(holding_file_reader_, *options_);
auto s2 = MakeArrowRecordBatchReader(holding_file_reader_, options_);
if (!s2.ok()) {
return arrow::Status::UnknownError(s2.status().ToString());
}
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/reader/record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
#include <arrow/filesystem/filesystem.h>
#include <cstdint>
#include <memory>
#include "common/macro.h"
#include "file/delete_fragment.h"
#include "file/fragment.h"
#include "reader/common/combine_reader.h"
#include "reader/filter_query_record_reader.h"
#include "reader/merge_record_reader.h"
#include "reader/scan_record_reader.h"
#include "common/utils.h"

namespace milvus_storage {
DeleteFragmentVector FilterDeleteFragments(FragmentVector& data_fragments, DeleteFragmentVector& delete_fragments) {
Expand Down Expand Up @@ -85,4 +88,21 @@ bool RecordReader::filters_only_contain_pk_and_version(std::shared_ptr<Schema> s
return true;
}

Result<std::shared_ptr<arrow::RecordBatchReader>> RecordReader::MakeScanDataReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs) {
auto scalar_reader = std::make_shared<MultiFilesSequentialReader>(
fs, manifest->scalar_fragments(), manifest->schema()->scalar_schema(), ReadOptions::default_read_options());
auto vector_reader = std::make_shared<MultiFilesSequentialReader>(
fs, manifest->vector_fragments(), manifest->schema()->vector_schema(), ReadOptions::default_read_options());

ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader, vector_reader, manifest->schema()));
return std::static_pointer_cast<arrow::RecordBatchReader>(combine_reader);
}

std::shared_ptr<arrow::RecordBatchReader> RecordReader::MakeScanDeleteReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs) {
auto reader = std::make_shared<MultiFilesSequentialReader>(
fs, manifest->delete_fragments(), manifest->schema()->delete_schema(), ReadOptions::default_read_options());
return reader;
}
} // namespace milvus_storage
6 changes: 6 additions & 0 deletions cpp/src/reader/record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,11 @@ struct RecordReader {

static bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema,
const std::vector<std::unique_ptr<Filter>>& filters);

static Result<std::shared_ptr<arrow::RecordBatchReader>> MakeScanDataReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs);

static std::shared_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
};
} // namespace milvus_storage
2 changes: 2 additions & 0 deletions cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Result<Blob> Manifest::get_blob(std::string& name) {
return *iter;
}

const std::vector<Blob>& Manifest::blobs() const { return blobs_; }

int64_t Manifest::version() const { return version_; }

void Manifest::set_version(int64_t version) { version_ = version; }
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "storage/schema.h"
#include "file/fragment.h"
#include "arrow/filesystem/filesystem.h"
#include "file/blob.h";
#include "file/blob.h"
namespace milvus_storage {

class Manifest {
Expand All @@ -21,23 +21,25 @@ class Manifest {

void add_blob(Blob&& blob);

const FragmentVector& scalar_fragments() const;
[[nodiscard]] const FragmentVector& scalar_fragments() const;

const FragmentVector& vector_fragments() const;
[[nodiscard]] const FragmentVector& vector_fragments() const;

const FragmentVector& delete_fragments() const;
[[nodiscard]] const FragmentVector& delete_fragments() const;

bool has_blob(std::string& name);

void remove_blob_if_exist(std::string& name);

Result<Blob> get_blob(std::string& name);

int64_t version() const;
[[nodiscard]] const std::vector<Blob>& blobs() const;

[[nodiscard]] int64_t version() const;

void set_version(int64_t version);

Result<manifest_proto::Manifest> ToProtobuf() const;
[[nodiscard]] Result<manifest_proto::Manifest> ToProtobuf() const;

void FromProtobuf(const manifest_proto::Manifest& manifest);

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/storage/options.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <memory>
#include "filter/filter.h"
#include "proto/manifest.pb.h"

Expand All @@ -22,8 +23,8 @@ struct ReadOptions {
// int limit = -1;
int64_t version = INT64_MAX;

static ReadOptions& default_read_options() {
static ReadOptions options;
static std::shared_ptr<ReadOptions> default_read_options() {
static std::shared_ptr<ReadOptions> options = std::make_shared<ReadOptions>();
return options;
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,14 @@ Result<arrow::fs::FileInfoVector> Space::FindAllManifest(std::shared_ptr<arrow::
return info_vec;
}

std::vector<Blob> Space::StatisticsBlobs() { return manifest_->blobs(); }

Result<std::shared_ptr<arrow::RecordBatchReader>> Space::ScanDelete() {
return RecordReader::MakeScanDeleteReader(manifest_, fs_);
}

Result<std::shared_ptr<arrow::RecordBatchReader>> Space::ScanData() {
return RecordReader::MakeScanDataReader(manifest_, fs_);
}

} // namespace milvus_storage
9 changes: 9 additions & 0 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <arrow/record_batch.h>
#include <atomic>
#include <mutex>

Expand All @@ -16,6 +17,12 @@ class Space {

std::unique_ptr<arrow::RecordBatchReader> Read(std::shared_ptr<ReadOptions> option);

// Scan delete files
Result<std::shared_ptr<arrow::RecordBatchReader>> ScanDelete();

// Scan data files without filtering deleted data
Result<std::shared_ptr<arrow::RecordBatchReader>> ScanData();

Status Delete(arrow::RecordBatchReader* reader);

// Open opened a space or create if the space does not exist.
Expand All @@ -33,6 +40,8 @@ class Space {
// Get the byte size of a blob.
Result<int64_t> GetBlobByteSize(std::string name);

std::vector<Blob> StatisticsBlobs();

private:
Status Init();

Expand Down

0 comments on commit fb6014c

Please sign in to comment.