diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 4cc219a7..462864df 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable( manifest_test.cpp space_test.cpp test_util.cpp + multi_files_sequential_reader_test.cpp ) target_link_libraries( diff --git a/cpp/test/multi_files_sequential_reader_test.cpp b/cpp/test/multi_files_sequential_reader_test.cpp new file mode 100644 index 00000000..66afcacb --- /dev/null +++ b/cpp/test/multi_files_sequential_reader_test.cpp @@ -0,0 +1,58 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "file/fragment.h" +#include "gmock/gmock.h" +#include "reader/multi_files_sequential_reader.h" +#include "storage/options.h" +#include "test_util.h" +#include "arrow/table.h" +#include "common/fs_util.h" +namespace milvus_storage { +TEST(MultiFilesSeqReaderTest, ReadTest) { + auto arrow_schema = CreateArrowSchema({"pk_field"}, {arrow::int64()}); + arrow::Int64Builder pk_builder; + ASSERT_STATUS_OK(pk_builder.Append(1)); + ASSERT_STATUS_OK(pk_builder.Append(2)); + ASSERT_STATUS_OK(pk_builder.Append(3)); + std::shared_ptr pk_array; + ASSERT_STATUS_OK(pk_builder.Finish(&pk_array)); + auto rec_batch = arrow::RecordBatch::Make(arrow_schema, 3, {pk_array}); + + ASSERT_AND_ASSIGN(auto fs, BuildFileSystem("file:///tmp/")); + ASSERT_AND_ARROW_ASSIGN(auto f1, fs->OpenOutputStream("/tmp/file1")); + ASSERT_AND_ARROW_ASSIGN(auto w1, parquet::arrow::FileWriter::Open(*arrow_schema, arrow::default_memory_pool(), f1)); + ASSERT_STATUS_OK(w1->WriteRecordBatch(*rec_batch)); + ASSERT_STATUS_OK(w1->Close()); + ASSERT_STATUS_OK(f1->Close()); + ASSERT_AND_ARROW_ASSIGN(auto f2, fs->OpenOutputStream("/tmp/file2")); + ASSERT_AND_ARROW_ASSIGN(auto w2, parquet::arrow::FileWriter::Open(*arrow_schema, arrow::default_memory_pool(), f2)); + ASSERT_STATUS_OK(w2->WriteRecordBatch(*rec_batch)); + ASSERT_STATUS_OK(w2->Close()); + ASSERT_STATUS_OK(f2->Close()); + + Fragment frag(1); + frag.add_file("/tmp/file1"); + frag.add_file("/tmp/file2"); + auto opt = std::make_shared(); + opt->columns.emplace_back("pk_field"); + MultiFilesSequentialReader r(fs, {frag}, arrow_schema, opt); + ASSERT_AND_ARROW_ASSIGN(auto table, r.ToTable()); + ASSERT_AND_ARROW_ASSIGN(auto combined_table, table->CombineChunks()); + auto pk_res = std::dynamic_pointer_cast(combined_table->GetColumnByName("pk_field")->chunk(0)); + std::vector pks; + pks.reserve(pk_res->length()); + for (int i = 0; i < pk_res->length(); ++i) { + pks.push_back(pk_res->Value(i)); + } + ASSERT_THAT(pks, testing::ElementsAre(1, 2, 3, 1, 2, 3)); + ASSERT_STATUS_OK(r.Close()); +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/test_util.h b/cpp/test/test_util.h index 1f7b3441..4b050b1d 100644 --- a/cpp/test/test_util.h +++ b/cpp/test/test_util.h @@ -4,6 +4,7 @@ #include #include #include "arrow/type.h" +#include "common/macro.h" namespace milvus_storage { #define ASSERT_STATUS_OK(status) \