Skip to content

Commit

Permalink
[Cpp]: add arrow utils ut (#111)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Jan 5, 2024
1 parent e532ae6 commit 152629e
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ find_package(glog REQUIRED)

file(GLOB_RECURSE SRC_FILES src/*.cpp src/*.cc)
add_library(milvus-storage ${SRC_FILES})
target_include_directories(milvus-storage PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include/milvus-storage ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_include_directories(milvus-storage PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include/milvus-storage ${CMAKE_CURRENT_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR}/test/include)
target_link_libraries(milvus-storage PUBLIC
arrow::libarrow
arrow::libparquet
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace milvus_storage {

class ParquetFileWriter : public FileWriter {
public:
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema, arrow::fs::FileSystem& fs, std::string& file_path);
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema, arrow::fs::FileSystem& fs, const std::string& file_path);

Status Init() override;

Expand All @@ -34,7 +34,7 @@ class ParquetFileWriter : public FileWriter {
private:
arrow::fs::FileSystem& fs_;
std::shared_ptr<arrow::Schema> schema_;
std::string file_path_;
const std::string file_path_;

std::unique_ptr<parquet::arrow::FileWriter> writer_;
int64_t count_ = 0;
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ namespace milvus_storage {
Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
arrow::internal::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
auto schema = uri_parser.scheme();
if (schema == "file") {
auto scheme = uri_parser.scheme();
if (scheme == "file") {
if (out_path == nullptr) {
return Status::InvalidArgument("out_path should not be nullptr if schema is file");
return Status::InvalidArgument("out_path should not be nullptr if scheme is file");
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::LocalFileSystemOptions::FromUri(uri_parser, out_path));
return std::unique_ptr<arrow::fs::FileSystem>(new arrow::fs::LocalFileSystem(option));
Expand All @@ -42,7 +42,7 @@ Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }

if (schema == "opendal") {
if (scheme == "opendal") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option));
return std::unique_ptr<arrow::fs::FileSystem>(std::move(fs));
Expand All @@ -64,7 +64,7 @@ Result<std::unique_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }
//
return Status::InvalidArgument("Unsupported schema: " + schema);
return Status::InvalidArgument("Unsupported schema: " + scheme);
}
/**
* Uri Convert to Path
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace milvus_storage {

ParquetFileWriter::ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::string& file_path)
const std::string& file_path)
: schema_(std::move(schema)), fs_(fs), file_path_(file_path) {}

Status ParquetFileWriter::Init() {
Expand Down
9 changes: 3 additions & 6 deletions cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
find_package(GTest REQUIRED)

file(GLOB_RECURSE BUSTUB_TEST_SOURCES "${PROJECT_SOURCE_DIR}/test/*.cpp")

add_executable(
milvus_test
options_test.cpp
schema_test.cpp
manifest_test.cpp
space_test.cpp
test_util.cpp
multi_files_sequential_reader_test.cpp
${BUSTUB_TEST_SOURCES}
)

target_link_libraries(
Expand Down
44 changes: 44 additions & 0 deletions cpp/test/common/arrow_utils_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <boost/filesystem/operations.hpp>
#include "common/arrow_util.h"
#include "common/fs_util.h"
#include "test_util.h"
#include "gtest/gtest.h"
#include "boost/filesystem/path.hpp"

namespace milvus_storage {

class ArrowUtilsTest : public testing::Test {
protected:
void SetUp() override {
path_ = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
boost::filesystem::create_directories(path_);
}
void TearDown() override { boost::filesystem::remove_all(path_); }
boost::filesystem::path path_;
};

TEST_F(ArrowUtilsTest, TestMakeArrowRecordBatchReader) {
std::string out;
ASSERT_AND_ASSIGN(auto fs, BuildFileSystem("file://" + path_.string(), &out));
auto file_path = path_.string() + "/test.parquet";
ASSERT_STATUS_OK(PrepareSimpleParquetFile(*fs, file_path, 1));
ASSERT_AND_ASSIGN(auto file_reader, MakeArrowFileReader(*fs, file_path));
ASSERT_AND_ASSIGN(auto batch_reader, MakeArrowRecordBatchReader(*file_reader));
ASSERT_AND_ARROW_ASSIGN(auto batch, batch_reader->Next());
ASSERT_EQ(1, batch->num_rows());
}
} // namespace milvus_storage
4 changes: 4 additions & 0 deletions cpp/test/test_util.h → cpp/test/include/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <string>
#include "arrow/type.h"
#include "common/macro.h"
#include "common/status.h"
#include "arrow/filesystem/filesystem.h"

namespace milvus_storage {
#define ASSERT_STATUS_OK(status) \
Expand Down Expand Up @@ -46,4 +48,6 @@ namespace milvus_storage {
#define ASSERT_AND_ARROW_ASSIGN(lhs, rexpr) ASSERT_AND_ARROW_ASSIGN_IMPL(CONCAT(_tmp_value, __COUNTER__), lhs, rexpr);
std::shared_ptr<arrow::Schema> CreateArrowSchema(std::vector<std::string> field_names,
std::vector<std::shared_ptr<arrow::DataType>> field_types);

Status PrepareSimpleParquetFile(arrow::fs::FileSystem& fs, const std::string& file_path, int num_rows);
} // namespace milvus_storage
18 changes: 18 additions & 0 deletions cpp/test/test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.

#include "test_util.h"
#include <arrow/type_fwd.h>
#include "format/parquet/file_writer.h"
#include "arrow/array/builder_primitive.h"
namespace milvus_storage {
std::shared_ptr<arrow::Schema> CreateArrowSchema(std::vector<std::string> field_names,
std::vector<std::shared_ptr<arrow::DataType>> field_types) {
Expand All @@ -22,4 +25,19 @@ std::shared_ptr<arrow::Schema> CreateArrowSchema(std::vector<std::string> field_
}
return std::make_shared<arrow::Schema>(fields);
}

Status PrepareSimpleParquetFile(arrow::fs::FileSystem& fs, const std::string& file_path, int num_rows) {
auto schema = CreateArrowSchema({"f_int64"}, {arrow::int64()});
ParquetFileWriter w(schema, fs, file_path);
w.Init();
arrow::Int64Builder builder;
for (int i = 0; i < num_rows; i++) {
RETURN_ARROW_NOT_OK(builder.Append(i));
}
std::shared_ptr<arrow::Array> array;
RETURN_ARROW_NOT_OK(builder.Finish(&array));
auto batch = arrow::RecordBatch::Make(schema, num_rows, {array});
RETURN_NOT_OK(w.Write(*batch));
return w.Close();
}
} // namespace milvus_storage

0 comments on commit 152629e

Please sign in to comment.