Skip to content

Commit

Permalink
Add scan interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 1, 2023
1 parent 9875aa9 commit 93b68e3
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 5 deletions.
20 changes: 20 additions & 0 deletions cpp/src/reader/record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
#include <arrow/filesystem/filesystem.h>
#include <cstdint>
#include <memory>
#include "common/macro.h"
#include "file/delete_fragment.h"
#include "file/fragment.h"
#include "reader/common/combine_reader.h"
#include "reader/filter_query_record_reader.h"
#include "reader/merge_record_reader.h"
#include "reader/scan_record_reader.h"
#include "common/utils.h"

namespace milvus_storage {
DeleteFragmentVector FilterDeleteFragments(FragmentVector& data_fragments, DeleteFragmentVector& delete_fragments) {
Expand Down Expand Up @@ -85,4 +88,21 @@ bool RecordReader::filters_only_contain_pk_and_version(std::shared_ptr<Schema> s
return true;
}

Result<std::shared_ptr<arrow::RecordBatchReader>> RecordReader::MakeScanDataReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs) {
auto scalar_reader = std::make_shared<MultiFilesSequentialReader>(
fs, manifest->scalar_fragments(), manifest->schema()->scalar_schema(), ReadOptions::default_read_options());
auto vector_reader = std::make_shared<MultiFilesSequentialReader>(
fs, manifest->vector_fragments(), manifest->schema()->vector_schema(), ReadOptions::default_read_options());

ASSIGN_OR_RETURN_NOT_OK(auto combine_reader, CombineReader::Make(scalar_reader, vector_reader, manifest->schema()));
return std::static_pointer_cast<arrow::RecordBatchReader>(combine_reader);
}

std::shared_ptr<arrow::RecordBatchReader> RecordReader::MakeScanDeleteReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs) {
auto reader = std::make_shared<MultiFilesSequentialReader>(
fs, manifest->delete_fragments(), manifest->schema()->delete_schema(), ReadOptions::default_read_options());
return reader;
}
} // namespace milvus_storage
6 changes: 6 additions & 0 deletions cpp/src/reader/record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,11 @@ struct RecordReader {

static bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema,
const std::vector<std::unique_ptr<Filter>>& filters);

static Result<std::shared_ptr<arrow::RecordBatchReader>> MakeScanDataReader(
std::shared_ptr<Manifest> manifest, std::shared_ptr<arrow::fs::FileSystem> fs);

static std::shared_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
};
} // namespace milvus_storage
2 changes: 2 additions & 0 deletions cpp/src/storage/manifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Result<Blob> Manifest::get_blob(std::string& name) {
return *iter;
}

const std::vector<Blob>& Manifest::blobs() const { return blobs_; }

int64_t Manifest::version() const { return version_; }

void Manifest::set_version(int64_t version) { version_ = version; }
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,25 @@ class Manifest {

void add_blob(Blob&& blob);

const FragmentVector& scalar_fragments() const;
[[nodiscard]] const FragmentVector& scalar_fragments() const;

const FragmentVector& vector_fragments() const;
[[nodiscard]] const FragmentVector& vector_fragments() const;

const FragmentVector& delete_fragments() const;
[[nodiscard]] const FragmentVector& delete_fragments() const;

bool has_blob(std::string& name);

void remove_blob_if_exist(std::string& name);

Result<Blob> get_blob(std::string& name);

int64_t version() const;
[[nodiscard]] const std::vector<Blob>& blobs() const;

[[nodiscard]] int64_t version() const;

void set_version(int64_t version);

Result<manifest_proto::Manifest> ToProtobuf() const;
[[nodiscard]] Result<manifest_proto::Manifest> ToProtobuf() const;

void FromProtobuf(const manifest_proto::Manifest& manifest);

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,14 @@ Result<arrow::fs::FileInfoVector> Space::FindAllManifest(std::shared_ptr<arrow::
return info_vec;
}

std::vector<Blob> Space::StatisticsBlobs() { return manifest_->blobs(); }

Result<std::shared_ptr<arrow::RecordBatchReader>> Space::ScanDelete() {
return RecordReader::MakeScanDeleteReader(manifest_, fs_);
}

Result<std::shared_ptr<arrow::RecordBatchReader>> Space::ScanData() {
return RecordReader::MakeScanDataReader(manifest_, fs_);
}

} // namespace milvus_storage
9 changes: 9 additions & 0 deletions cpp/src/storage/space.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <arrow/record_batch.h>
#include <atomic>
#include <mutex>

Expand All @@ -16,6 +17,12 @@ class Space {

std::unique_ptr<arrow::RecordBatchReader> Read(std::shared_ptr<ReadOptions> option);

// Scan delete files
Result<std::shared_ptr<arrow::RecordBatchReader>> ScanDelete();

// Scan data files without filtering deleted data
Result<std::shared_ptr<arrow::RecordBatchReader>> ScanData();

Status Delete(arrow::RecordBatchReader* reader);

// Open opened a space or create if the space does not exist.
Expand All @@ -33,6 +40,8 @@ class Space {
// Get the byte size of a blob.
Result<int64_t> GetBlobByteSize(std::string name);

std::vector<Blob> StatisticsBlobs();

private:
Status Init();

Expand Down

0 comments on commit 93b68e3

Please sign in to comment.