Skip to content

Commit

Permalink
[Cpp]: make fs unique ptr (#107)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 3, 2024
1 parent ec1bbbf commit 3c84114
Show file tree
Hide file tree
Showing 25 changed files with 89 additions and 89 deletions.
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/common/arrow_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include "storage/options.h"

namespace milvus_storage {
Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& file_path);
Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(arrow::fs::FileSystem& fs,
const std::string& file_path);

Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader,
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#include "result.h"
namespace milvus_storage {

Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path = nullptr);
Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path = nullptr);

std::string UriToPath(const std::string& uri);

} // namespace milvus_storage
} // namespace milvus_storage
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/common/opendal_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class OpendalFileSystem : public arrow::fs::FileSystem {
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata = {}) override;

/// Create a S3FileSystem instance from the given options.
static arrow::Result<std::shared_ptr<OpendalFileSystem>> Make(
static arrow::Result<std::unique_ptr<OpendalFileSystem>> Make(
const OpendalOptions& options, const arrow::io::IOContext& = arrow::io::default_io_context());

protected:
Expand Down
12 changes: 6 additions & 6 deletions cpp/include/milvus-storage/file/delete_fragment.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -32,7 +32,7 @@ class DeleteFragment {
public:
DeleteFragment() = default;

DeleteFragment(std::shared_ptr<arrow::fs::FileSystem> fs, std::shared_ptr<Schema> schema, int64_t id = 0);
DeleteFragment(arrow::fs::FileSystem& fs, std::shared_ptr<Schema> schema, int64_t id = 0);

bool id() const { return id_; }

Expand All @@ -46,14 +46,14 @@ class DeleteFragment {

Status Add(std::shared_ptr<arrow::RecordBatch> batch);
// Make an instance of DeleteFragment of the given fragment whose type is kDelete
static Result<DeleteFragment> Make(std::shared_ptr<arrow::fs::FileSystem> fs,
static Result<DeleteFragment> Make(arrow::fs::FileSystem& fs,
std::shared_ptr<Schema> schema,
const Fragment& fragment);

private:
int64_t id_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
arrow::fs::FileSystem& fs_;
// the deleted data parsed from the files of fragment_
std::unordered_map<pk_type, std::vector<int64_t>> data_; // pk to versions(if exists)
};
Expand Down
16 changes: 7 additions & 9 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,24 +21,22 @@ namespace milvus_storage {

class ParquetFileWriter : public FileWriter {
public:
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& file_path);
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema, arrow::fs::FileSystem& fs, std::string& file_path);

Status Init() override;

Status Write(arrow::RecordBatch* record) override;
Status Write(const arrow::RecordBatch& record) override;

int64_t count() override;

Status Close() override;

private:
std::shared_ptr<arrow::fs::FileSystem> fs_;
arrow::fs::FileSystem& fs_;
std::shared_ptr<arrow::Schema> schema_;
std::string file_path_;

std::unique_ptr<parquet::arrow::FileWriter> writer_;
int64_t count_ = 0;
};
} // namespace milvus_storage
} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/format/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class FileWriter {
public:
virtual Status Init() = 0;

virtual Status Write(arrow::RecordBatch* record) = 0;
virtual Status Write(const arrow::RecordBatch& record) = 0;

virtual int64_t count() = 0;

virtual Status Close() = 0;
};
} // namespace milvus_storage
} // namespace milvus_storage
10 changes: 5 additions & 5 deletions cpp/include/milvus-storage/reader/filter_query_record_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -26,7 +26,7 @@ class FilterQueryRecordReader : public arrow::RecordBatchReader {
const FragmentVector& scalar_fragments,
const FragmentVector& vector_fragments,
const DeleteFragmentVector& delete_fragments,
std::shared_ptr<arrow::fs::FileSystem> fs,
arrow::fs::FileSystem& fs,
std::shared_ptr<Schema> schema);

std::shared_ptr<arrow::Schema> schema() const override;
Expand All @@ -37,7 +37,7 @@ class FilterQueryRecordReader : public arrow::RecordBatchReader {
// Try to make inner reader, return nullptr if next_pos_ reach the end.
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeInnerReader();

std::shared_ptr<arrow::fs::FileSystem> fs_;
arrow::fs::FileSystem& fs_;
std::shared_ptr<Schema> schema_;
const ReadOptions options_;
DeleteFragmentVector delete_fragments_;
Expand Down
10 changes: 5 additions & 5 deletions cpp/include/milvus-storage/reader/merge_record_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -30,7 +30,7 @@ class MergeRecordReader : public arrow::RecordBatchReader {
const FragmentVector& scalar_fragments,
const FragmentVector& vector_fragments,
const DeleteFragmentVector& delete_fragments,
std::shared_ptr<arrow::fs::FileSystem> fs,
arrow::fs::FileSystem& fs,
std::shared_ptr<Schema> schema);

std::shared_ptr<arrow::Schema> schema() const override;
Expand All @@ -40,7 +40,7 @@ class MergeRecordReader : public arrow::RecordBatchReader {
private:
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeInnerReader();

std::shared_ptr<arrow::fs::FileSystem> fs_;
arrow::fs::FileSystem& fs_;
std::shared_ptr<Schema> schema_;
const ReadOptions options_;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -24,7 +24,7 @@ namespace milvus_storage {

class MultiFilesSequentialReader : public arrow::RecordBatchReader {
public:
MultiFilesSequentialReader(std::shared_ptr<arrow::fs::FileSystem> fs,
MultiFilesSequentialReader(arrow::fs::FileSystem& fs,
const FragmentVector& fragments,
std::shared_ptr<arrow::Schema> schema,
const ReadOptions& options);
Expand All @@ -34,7 +34,7 @@ class MultiFilesSequentialReader : public arrow::RecordBatchReader {
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

private:
std::shared_ptr<arrow::fs::FileSystem> fs_;
arrow::fs::FileSystem& fs_;
std::shared_ptr<arrow::Schema> schema_;
std::vector<std::string> files_;

Expand Down
8 changes: 4 additions & 4 deletions cpp/include/milvus-storage/reader/record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace milvus_storage {
namespace internal {
std::unique_ptr<arrow::RecordBatchReader> MakeRecordReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<Schema> schema,
std::shared_ptr<arrow::fs::FileSystem> fs,
DeleteFragmentVector delete_fragments,
arrow::fs::FileSystem& fs,
const DeleteFragmentVector& delete_fragments,
const ReadOptions& options);

bool only_contain_scalar_columns(std::shared_ptr<Schema> schema, const std::set<std::string>& related_columns);
Expand All @@ -32,9 +32,9 @@ bool only_contain_vector_columns(std::shared_ptr<Schema> schema, const std::set<
bool filters_only_contain_pk_and_version(std::shared_ptr<Schema> schema, const Filter::FilterSet& filters);

std::unique_ptr<arrow::RecordBatchReader> MakeScanDataReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
arrow::fs::FileSystem& fs);

std::unique_ptr<arrow::RecordBatchReader> MakeScanDeleteReader(std::shared_ptr<Manifest> manifest,
std::shared_ptr<arrow::fs::FileSystem> fs);
arrow::fs::FileSystem& fs);
} // namespace internal
} // namespace milvus_storage
10 changes: 5 additions & 5 deletions cpp/include/milvus-storage/reader/scan_record_reader.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 Zilliz
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -23,7 +23,7 @@ class ScanRecordReader : public arrow::RecordBatchReader {
public:
ScanRecordReader(std::shared_ptr<Schema> schema,
const ReadOptions& options,
std::shared_ptr<arrow::fs::FileSystem> fs,
arrow::fs::FileSystem& fs,
const FragmentVector& fragments,
const DeleteFragmentVector& delete_fragments);

Expand All @@ -36,7 +36,7 @@ class ScanRecordReader : public arrow::RecordBatchReader {

std::shared_ptr<Schema> schema_;
const ReadOptions options_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
arrow::fs::FileSystem& fs_;
const FragmentVector fragments_;
const DeleteFragmentVector delete_fragments_;
std::shared_ptr<arrow::RecordBatchReader> reader_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/storage/manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class Manifest {
arrow::fs::FileInfo& file_info);

private:
// arrow's RecordBatchReader have a schema method which returns an shared ptr wrapped schema
// we store schema_ as shared_ptr here to avoid copy
std::shared_ptr<Schema> schema_;
FragmentVector scalar_fragments_;
FragmentVector vector_fragments_;
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/milvus-storage/storage/space.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class RecordReader;

class Space {
public:

Status Write(arrow::RecordBatchReader& reader, const WriteOption& option);

std::unique_ptr<arrow::RecordBatchReader> Read(const ReadOptions& option) const;
Expand Down Expand Up @@ -67,7 +68,7 @@ class Space {

static Result<arrow::fs::FileInfoVector> FindAllManifest(arrow::fs::FileSystem& fs, const std::string& path);

std::shared_ptr<arrow::fs::FileSystem> fs_;
std::unique_ptr<arrow::fs::FileSystem> fs_;
std::shared_ptr<Manifest> manifest_;
std::string path_;

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/common/arrow_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
#include "common/macro.h"

namespace milvus_storage {
Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& file_path) {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs->OpenInputFile(file_path));
Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(arrow::fs::FileSystem& fs,
const std::string& file_path) {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto file, fs.OpenInputFile(file_path));

std::unique_ptr<parquet::arrow::FileReader> file_reader;
RETURN_ARROW_NOT_OK(parquet::arrow::OpenFile(file, arrow::default_memory_pool(), &file_reader));
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace milvus_storage {

Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
arrow::internal::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
auto schema = uri_parser.scheme();
Expand All @@ -33,7 +33,7 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
return Status::InvalidArgument("out_path should not be nullptr if schema is file");
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::LocalFileSystemOptions::FromUri(uri_parser, out_path));
return std::shared_ptr<arrow::fs::FileSystem>(new arrow::fs::LocalFileSystem(option));
return std::unique_ptr<arrow::fs::FileSystem>(new arrow::fs::LocalFileSystem(option));
}

// if (schema == "hdfs") {
Expand All @@ -45,7 +45,7 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
if (schema == "opendal") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
return std::unique_ptr<arrow::fs::FileSystem>(std::move(fs));
}

// if (schema == "s3") {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/common/opendal_fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ arrow::Result<std::shared_ptr<OpendalInputFile>> read(arrow::io::IOContext& io_c

class OpendalOutputStream : public arrow::io::OutputStream {};

arrow::Result<std::shared_ptr<OpendalFileSystem>> OpendalFileSystem::Make(const OpendalOptions& options,
arrow::Result<std::unique_ptr<OpendalFileSystem>> OpendalFileSystem::Make(const OpendalOptions& options,
const arrow::io::IOContext& ctx) {
auto fs = std::shared_ptr<OpendalFileSystem>(new OpendalFileSystem(options, ctx));
auto fs = std::unique_ptr<OpendalFileSystem>(new OpendalFileSystem(options, ctx));
opendal_operator_options* op_options_ = opendal_operator_options_new();
for (auto& option : options.options()) {
opendal_operator_options_set(op_options_, option.first.c_str(), option.second.c_str());
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/file/delete_fragment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ arrow::Status DeleteFragmentVisitor::Visit(const arrow::Int64Array& array) { ret

arrow::Status DeleteFragmentVisitor::Visit(const arrow::StringArray& array) { return Visit<arrow::StringArray>(array); }

DeleteFragment::DeleteFragment(std::shared_ptr<arrow::fs::FileSystem> fs, std::shared_ptr<Schema> schema, int64_t id)
DeleteFragment::DeleteFragment(arrow::fs::FileSystem& fs, std::shared_ptr<Schema> schema, int64_t id)
: fs_(fs), schema_(schema), id_(id) {}

Status DeleteFragment::Add(std::shared_ptr<arrow::RecordBatch> batch) {
Expand All @@ -43,7 +43,7 @@ Status DeleteFragment::Add(std::shared_ptr<arrow::RecordBatch> batch) {
return Status::OK();
}

Result<DeleteFragment> DeleteFragment::Make(std::shared_ptr<arrow::fs::FileSystem> fs,
Result<DeleteFragment> DeleteFragment::Make(arrow::fs::FileSystem& fs,
std::shared_ptr<Schema> schema,
const Fragment& fragment) {
DeleteFragment delete_fragment(fs, schema, fragment.id());
Expand Down
Loading

0 comments on commit 3c84114

Please sign in to comment.