diff --git a/bolt/dwio/parquet/reader/ParquetReader.cpp b/bolt/dwio/parquet/reader/ParquetReader.cpp index 8caf7f2ce..54f7f5feb 100644 --- a/bolt/dwio/parquet/reader/ParquetReader.cpp +++ b/bolt/dwio/parquet/reader/ParquetReader.cpp @@ -1548,15 +1548,48 @@ class ParquetRowReader::Impl { } uint64_t skip(uint64_t skipSize) { - auto rowsToSkip = nextReadSize(skipSize); - if (rowsToSkip == kAtEnd) { - return 0; + uint64_t totalSkipped = 0; + + // First, consume rows remaining in the currently loaded row group, if any. + if (skipSize > 0 && currentRowInGroup_ < rowsInCurrentRowGroup_) { + const auto rowsToSkip = std::min( + skipSize, rowsInCurrentRowGroup_ - currentRowInGroup_); + columnReader_->setReadOffset(columnReader_->readOffset() + rowsToSkip); + currentRowInGroup_ += rowsToSkip; + totalSkipped += rowsToSkip; + skipSize -= rowsToSkip; } - BOLT_DCHECK_GT(rowsToSkip, 0); - columnReader_->setReadOffset(columnReader_->readOffset() + rowsToSkip); - currentRowInGroup_ += rowsToSkip; - return rowsToSkip; + // Then, skip over whole row groups purely using footer metadata without + // loading them. When a row group can be fully skipped, just advance the + // index instead of scheduling/loading it. When skip lands inside a row + // group, advance to it (which schedules/loads it) and adjust the in-group + // read offset. + while (skipSize > 0 && nextRowGroupIdsIdx_ < rowGroupIds_.size()) { + const auto nextRowGroupIndex = rowGroupIds_[nextRowGroupIdsIdx_]; + const auto rowsInNextGroup = rowGroups_[nextRowGroupIndex].num_rows; + + if (skipSize >= rowsInNextGroup) { + // Skip the whole row group by advancing the index; do not schedule or + // load it. + skipSize -= rowsInNextGroup; + totalSkipped += rowsInNextGroup; + ++nextRowGroupIdsIdx_; + continue; + } + + // Landing inside this row group: load it now and seek to the in-group + // offset. + if (!advanceToNextRowGroup()) { + break; + } + columnReader_->setReadOffset(columnReader_->readOffset() + skipSize); + currentRowInGroup_ = skipSize; + totalSkipped += skipSize; + skipSize = 0; + break; + } + return totalSkipped; } uint64_t next( diff --git a/bolt/dwio/parquet/tests/reader/CMakeLists.txt b/bolt/dwio/parquet/tests/reader/CMakeLists.txt index 5606d2081..eb47dc171 100644 --- a/bolt/dwio/parquet/tests/reader/CMakeLists.txt +++ b/bolt/dwio/parquet/tests/reader/CMakeLists.txt @@ -114,3 +114,12 @@ if(${BOLT_ENABLE_ARROW}) ) endif() + +if(${BOLT_BUILD_BENCHMARKS}) + add_executable(bolt_dwio_parquet_reader_skip_benchmark ParquetReaderSkipBenchmark.cpp) + target_link_libraries( + bolt_dwio_parquet_reader_skip_benchmark + PRIVATE bolt_testutils + ${FOLLY_BENCHMARK} + ) +endif() diff --git a/bolt/dwio/parquet/tests/reader/ParquetReaderSkipBenchmark.cpp b/bolt/dwio/parquet/tests/reader/ParquetReaderSkipBenchmark.cpp new file mode 100644 index 000000000..d9764c6e5 --- /dev/null +++ b/bolt/dwio/parquet/tests/reader/ParquetReaderSkipBenchmark.cpp @@ -0,0 +1,398 @@ +/* + * Copyright (c) ByteDance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "bolt/common/io/IoStatistics.h" +#include "bolt/dwio/common/FileSink.h" +#include "bolt/dwio/common/Options.h" +#include "bolt/dwio/common/tests/utils/DataSetBuilder.h" +#include "bolt/dwio/parquet/RegisterParquetReader.h" +#include "bolt/dwio/parquet/reader/ParquetReader.h" +#include "bolt/dwio/parquet/writer/Writer.h" +#include "bolt/exec/tests/utils/TempDirectoryPath.h" + +using namespace bytedance::bolt; +using namespace bytedance::bolt::dwio; +using namespace bytedance::bolt::dwio::common; +using namespace bytedance::bolt::parquet; +using namespace bytedance::bolt::test; + +namespace { + +// Keep each row group deterministic and reasonably small so skip overhead is +// observable. Data size around 100MB. +constexpr uint32_t kRowsPerRowGroup = 100'000; // 100k rows per row group +constexpr uint32_t kNumRowGroups = 5; // 5 row groups +constexpr uint32_t kNumRowsPerBatch = kRowsPerRowGroup; +constexpr uint32_t kNumBatches = kNumRowGroups; +constexpr uint32_t kTotalRows = kRowsPerRowGroup * kNumRowGroups; + +class SkipBenchmark { + public: + SkipBenchmark() { + rootPool_ = memory::memoryManager()->addRootPool("ParquetSkipBenchmark"); + leafPool_ = rootPool_->addLeafChild("ParquetSkipBenchmark"); + // Use multiple columns so that each row group has a reasonable size + rowType_ = + ROW({"id", + "value1", + "value2", + "value3", + "value4", + "value5", + "value6", + "value7", + "value8", + "value9"}, + {BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT()}); + path_ = fileFolder_->path + "/skip_bench.parquet"; + writeFile(); + } + + // Produces a single-column BIGINT parquet file with kNumRowGroups fixed-size + // row groups of kRowsPerRowGroup rows each. + void writeFile() { + auto localWriteFile = std::make_unique(path_, true, false); + auto sink = + std::make_unique(std::move(localWriteFile), path_); + + bytedance::bolt::parquet::WriterOptions options; + options.memoryPool = rootPool_.get(); + // Pin the row group boundary at exactly kRowsPerRowGroup rows, regardless + // of memory pressure. + const uint64_t rowsInRowGroup = kRowsPerRowGroup; + options.flushPolicyFactory = [rowsInRowGroup]() { + // Large byte limit so only row count decides the boundary. + return std::make_unique(rowsInRowGroup, 1LL << 40); + }; + + auto writer = std::make_unique( + std::move(sink), options, rowType_); + + DataSetBuilder builder(*leafPool_, 0); + auto batches = + builder.makeDataset(rowType_, kNumBatches, kNumRowsPerBatch).build(); + for (auto& batch : *batches) { + writer->write(batch); + } + writer->flush(); + writer->close(); + } + + std::pair< + std::unique_ptr, + std::shared_ptr> + makeRowReaderWithStats(int32_t prefetch) { + auto ioStats = std::make_shared(); + dwio::common::ReaderOptions readerOpts{leafPool_.get()}; + readerOpts.setPrefetchRowGroups(prefetch); + // Avoid preloading the whole file; keep I/O observable at row-group level. + readerOpts.setFilePreloadThreshold(0); + readerOpts.setFooterEstimatedSize(16); + auto input = std::make_unique( + std::make_shared(path_), + readerOpts.getMemoryPool(), + dwio::common::MetricsLog::voidLog(), + ioStats.get()); + auto reader = std::make_unique(std::move(input), readerOpts); + + dwio::common::RowReaderOptions rowReaderOpts; + auto scanSpec = std::make_shared("root"); + scanSpec->addAllChildFields(*rowType_); + rowReaderOpts.setScanSpec(scanSpec); + return {reader->createRowReader(rowReaderOpts), ioStats}; + } + + uint64_t skipAll(dwio::common::RowReader& rr, uint64_t rows) { + uint64_t total = 0; + while (rows > 0) { + auto n = rr.skip(rows); + if (n == 0) { + break; + } + total += n; + rows -= n; + } + return total; + } + + // Touch a small batch so that the benchmark captures the cost of landing + + // decoding at the skip target (and so the skip optimization actually + // influences observed wall time). + uint64_t touchBatch(dwio::common::RowReader& rr, uint64_t size) { + VectorPtr result = BaseVector::create(rowType_, 0, leafPool_.get()); + auto n = rr.next(size, result); + folly::doNotOptimizeAway(result); + return n; + } + + // Scenario A: skip kept within the first row group. Baseline. + void skipWithinRowGroup( + int32_t prefetch, + folly::UserCounters& counters, + unsigned int iters) { + uint64_t totalRawBytes = 0; + for (unsigned int i = 0; i < iters; i++) { + folly::BenchmarkSuspender suspender; + auto [rr, ioStats] = makeRowReaderWithStats(prefetch); + suspender.dismiss(); + + auto skipped = skipAll(*rr, kRowsPerRowGroup / 2); + folly::doNotOptimizeAway(skipped); + touchBatch(*rr, 1024); + + totalRawBytes += ioStats->rawBytesRead(); + } + counters["RawBytes"] = totalRawBytes / iters; + } + + // Scenario B: skip exactly N whole row groups then read a small batch from + // the landing group. The core win of the optimization lives here. + void skipWholeRowGroups( + int32_t prefetch, + uint32_t numWholeGroups, + folly::UserCounters& counters, + unsigned int iters) { + uint64_t totalRawBytes = 0; + for (unsigned int i = 0; i < iters; i++) { + folly::BenchmarkSuspender suspender; + auto [rr, ioStats] = makeRowReaderWithStats(prefetch); + suspender.dismiss(); + + const uint64_t rows = + static_cast(numWholeGroups) * kRowsPerRowGroup; + auto skipped = skipAll(*rr, rows); + folly::doNotOptimizeAway(skipped); + touchBatch(*rr, 1024); + + totalRawBytes += ioStats->rawBytesRead(); + } + counters["RawBytes"] = totalRawBytes / iters; + } + + // Scenario C: skip that lands inside some later row group (not aligned). + void skipIntoMiddleOfFarRowGroup( + int32_t prefetch, + uint32_t numWholeGroups, + folly::UserCounters& counters, + unsigned int iters) { + uint64_t totalRawBytes = 0; + for (unsigned int i = 0; i < iters; i++) { + folly::BenchmarkSuspender suspender; + auto [rr, ioStats] = makeRowReaderWithStats(prefetch); + suspender.dismiss(); + + const uint64_t rows = + static_cast(numWholeGroups) * kRowsPerRowGroup + + kRowsPerRowGroup / 3; + auto skipped = skipAll(*rr, rows); + folly::doNotOptimizeAway(skipped); + touchBatch(*rr, 1024); + + totalRawBytes += ioStats->rawBytesRead(); + } + counters["RawBytes"] = totalRawBytes / iters; + } + + // Scenario D: skip past EOF; no subsequent read required. + void skipPastEof( + int32_t prefetch, + folly::UserCounters& counters, + unsigned int iters) { + uint64_t totalRawBytes = 0; + for (unsigned int i = 0; i < iters; i++) { + folly::BenchmarkSuspender suspender; + auto [rr, ioStats] = makeRowReaderWithStats(prefetch); + suspender.dismiss(); + + auto skipped = skipAll(*rr, kTotalRows + 1'000'000); + folly::doNotOptimizeAway(skipped); + + totalRawBytes += ioStats->rawBytesRead(); + } + counters["RawBytes"] = totalRawBytes / iters; + } + + // Scenario E: alternating next()/skip() mimicking a LIMIT/OFFSET style + // workload that skims through the file. + void alternatingNextSkip( + int32_t prefetch, + folly::UserCounters& counters, + unsigned int iters) { + uint64_t totalRawBytes = 0; + for (unsigned int i = 0; i < iters; i++) { + folly::BenchmarkSuspender suspender; + auto [rr, ioStats] = makeRowReaderWithStats(prefetch); + suspender.dismiss(); + + // Read 1k rows, skip 9k rows (one whole-group worth), repeat. This + // exercises many skip()s each crossing exactly one row-group boundary. + uint64_t totalRead = 0; + while (true) { + auto n = touchBatch(*rr, 1024); + if (n == 0) { + break; + } + totalRead += n; + auto skipped = skipAll(*rr, kRowsPerRowGroup - 1024); + if (skipped == 0) { + break; + } + } + folly::doNotOptimizeAway(totalRead); + + totalRawBytes += ioStats->rawBytesRead(); + } + counters["RawBytes"] = totalRawBytes / iters; + } + + private: + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + RowTypePtr rowType_; + std::shared_ptr fileFolder_ = + bytedance::bolt::exec::test::TempDirectoryPath::create(); + std::string path_; +}; + +SkipBenchmark* FOLLY_NULLABLE gFixture = nullptr; + +SkipBenchmark& fixture() { + BOLT_CHECK_NOT_NULL(gFixture); + return *gFixture; +} + +} // namespace + +// --- Registrations --- + +// Scenario A: skip only within current row group (prefetch0) +BENCHMARK_COUNTERS(skipWithinRG_prefetch0, counters, iters) { + fixture().skipWithinRowGroup(0, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario B: skip N whole row groups (prefetch0) +BENCHMARK_COUNTERS(skip1RG_prefetch0, counters, iters) { + fixture().skipWholeRowGroups(0, 1, counters, iters); +} +BENCHMARK_COUNTERS(skip2RG_prefetch0, counters, iters) { + fixture().skipWholeRowGroups(0, 2, counters, iters); +} +BENCHMARK_COUNTERS(skip3RG_prefetch0, counters, iters) { + fixture().skipWholeRowGroups(0, 3, counters, iters); +} +BENCHMARK_COUNTERS(skip4RG_prefetch0, counters, iters) { + fixture().skipWholeRowGroups(0, 4, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario C: skip into middle of a row group (prefetch0) +BENCHMARK_COUNTERS(skipMid1RG_prefetch0, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(0, 1, counters, iters); +} +BENCHMARK_COUNTERS(skipMid2RG_prefetch0, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(0, 2, counters, iters); +} +BENCHMARK_COUNTERS(skipMid3RG_prefetch0, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(0, 3, counters, iters); +} +BENCHMARK_COUNTERS(skipMid4RG_prefetch0, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(0, 4, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario D: skip past end of file (prefetch0) +BENCHMARK_COUNTERS(skipPastEof_prefetch0, counters, iters) { + fixture().skipPastEof(0, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario E: alternating next()/skip() (prefetch0) +BENCHMARK_COUNTERS(alternatingNextSkip_prefetch0, counters, iters) { + fixture().alternatingNextSkip(0, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario A: skip only within current row group (prefetch1) +BENCHMARK_COUNTERS(skipWithinRG_prefetch1, counters, iters) { + fixture().skipWithinRowGroup(1, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario B: skip N whole row groups (prefetch1) +BENCHMARK_COUNTERS(skip1RG_prefetch1, counters, iters) { + fixture().skipWholeRowGroups(1, 1, counters, iters); +} +BENCHMARK_COUNTERS(skip2RG_prefetch1, counters, iters) { + fixture().skipWholeRowGroups(1, 2, counters, iters); +} +BENCHMARK_COUNTERS(skip3RG_prefetch1, counters, iters) { + fixture().skipWholeRowGroups(1, 3, counters, iters); +} +BENCHMARK_COUNTERS(skip4RG_prefetch1, counters, iters) { + fixture().skipWholeRowGroups(1, 4, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario C: skip into middle of a row group (prefetch1) +BENCHMARK_COUNTERS(skipMid1RG_prefetch1, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(1, 1, counters, iters); +} +BENCHMARK_COUNTERS(skipMid2RG_prefetch1, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(1, 2, counters, iters); +} +BENCHMARK_COUNTERS(skipMid3RG_prefetch1, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(1, 3, counters, iters); +} +BENCHMARK_COUNTERS(skipMid4RG_prefetch1, counters, iters) { + fixture().skipIntoMiddleOfFarRowGroup(1, 4, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario D: skip past end of file (prefetch1) +BENCHMARK_COUNTERS(skipPastEof_prefetch1, counters, iters) { + fixture().skipPastEof(1, counters, iters); +} +BENCHMARK_DRAW_LINE(); + +// Scenario E: alternating next()/skip() (prefetch1) +BENCHMARK_COUNTERS(alternatingNextSkip_prefetch1, counters, iters) { + fixture().alternatingNextSkip(1, counters, iters); +} + +int main(int argc, char** argv) { + folly::init(&argc, &argv); + memory::MemoryManager::initialize({}); + registerParquetReaderFactory(); + SkipBenchmark fixtureInstance; + gFixture = &fixtureInstance; + + folly::runBenchmarks(); + + return 0; +} diff --git a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp index 16ffdd7ef..8f1e95600 100644 --- a/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp +++ b/bolt/dwio/parquet/tests/reader/ParquetReaderTest.cpp @@ -1175,6 +1175,183 @@ TEST_F(ParquetReaderTest, prefetchRowGroups) { } } +// Tests for ParquetRowReader::skip across row-group boundaries. Uses +// multiple_row_groups.parquet which has 4 row groups with sizes +// {127, 127, 127, 118} and a single BIGINT column "id" holding values 1..499 +// in order. +TEST_F(ParquetReaderTest, skipAcrossRowGroups) { + auto rowType = ROW({"id"}, {BIGINT()}); + const std::string sample(getExampleFilePath("multiple_row_groups.parquet")); + + // Row-group row counts and cumulative start offsets (0-based row index). + // rg0: rows [0, 127), ids [1, 127] + // rg1: rows [127, 254), ids [128, 254] + // rg2: rows [254, 381), ids [255, 381] + // rg3: rows [381, 499), ids [382, 499] + + auto makeReader = [&]() { + bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReader(sample, readerOptions); + RowReaderOptions rowReaderOpts; + rowReaderOpts.setScanSpec(makeScanSpec(rowType)); + return reader->createRowReader(rowReaderOpts); + }; + + auto expectNextIds = + [&](dwio::common::RowReader& rowReader, int64_t firstId, int64_t count) { + VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get()); + int64_t remaining = count; + int64_t nextExpected = firstId; + while (remaining > 0) { + auto n = rowReader.next(remaining, result); + ASSERT_GT(n, 0); + ASSERT_EQ(result->size(), n); + auto* rowVec = result->asUnchecked(); + auto* idVec = rowVec->childAt(0)->asFlatVector(); + ASSERT_NE(idVec, nullptr); + for (vector_size_t i = 0; i < n; ++i) { + ASSERT_FALSE(idVec->isNullAt(i)); + EXPECT_EQ(idVec->valueAt(i), nextExpected); + ++nextExpected; + } + remaining -= n; + } + }; + + // 1) Skip inside the first row group only (skip 0 whole row groups). + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(10), 10); + // Next id should be 11 (1-based), i.e. the 11th row. + expectNextIds(*rowReader, /*firstId=*/11, /*count=*/5); + } + + // 2) Skip exactly 1 whole row group. Skip 127 rows -> land at start of rg1 + // whose first id is 128. + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(127), 127); + expectNextIds(*rowReader, /*firstId=*/128, /*count=*/10); + } + + // 3) Skip 3 whole row groups. Skip 127*3=381 rows -> land at start of rg3, + // whose first id is 382. + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(381), 381); + expectNextIds(*rowReader, /*firstId=*/382, /*count=*/10); + } + + // 4) Skip landing at different positions inside a new row group. + // a) Landing at the very start of rg2 (skip = 254). + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(254), 254); + expectNextIds(*rowReader, /*firstId=*/255, /*count=*/5); + } + // b) Landing in the middle of rg2 (skip = 254 + 60 = 314). + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(314), 314); + // 315th row -> id 315. + expectNextIds(*rowReader, /*firstId=*/315, /*count=*/5); + } + // c) Landing at the last row of rg2 (skip = 254 + 126 = 380). + // Next read should yield id 381 (1 row), then cross into rg3. + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(380), 380); + expectNextIds(*rowReader, /*firstId=*/381, /*count=*/1); + // Now we should be at start of rg3. + expectNextIds(*rowReader, /*firstId=*/382, /*count=*/5); + } + + // 5) Skip size larger than total remaining rows: reader should skip all + // available rows, return the actual skipped count, and subsequent + // next() should return 0. + { + auto rowReader = makeReader(); + EXPECT_EQ(rowReader->skip(1000), 499); + VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get()); + EXPECT_EQ(rowReader->next(100, result), 0); + } + + // 5b) Partial consumption, then skip past end. + { + auto rowReader = makeReader(); + VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get()); + // Read 50 rows from rg0. + auto n = rowReader->next(50, result); + EXPECT_EQ(n, 50); + // Only 449 rows remain; requesting to skip 10000 should skip exactly 449. + EXPECT_EQ(rowReader->skip(10000), 449); + EXPECT_EQ(rowReader->next(100, result), 0); + } +} + +// Verifies that skipping entire row groups does not trigger I/O for those +// row groups, while a subsequent next() only reads the single landing row +// group. +TEST_F(ParquetReaderTest, skipAvoidsRowGroupIo) { + auto rowType = ROW({"id"}, {BIGINT()}); + const std::string sample(getExampleFilePath("multiple_row_groups.parquet")); + + bytedance::bolt::dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + // Ensure we only prefetch the current row group, so each advance triggers + // at most one row-group load. + readerOptions.setPrefetchRowGroups(0); + // Disable preload of the whole file so we can observe per-row-group I/O. + // Also shrink the footer estimate so that opening the file only reads the + // footer instead of the whole file. + readerOptions.setFilePreloadThreshold(0); + readerOptions.setFooterEstimatedSize(16); + + auto ioStats = std::make_shared(); + auto input = std::make_unique( + std::make_shared(sample), + readerOptions.getMemoryPool(), + dwio::common::MetricsLog::voidLog(), + ioStats.get()); + auto reader = std::make_unique( + std::move(input), readerOptions); + + RowReaderOptions rowReaderOpts; + rowReaderOpts.setScanSpec(makeScanSpec(rowType)); + auto rowReader = reader->createRowReader(rowReaderOpts); + + // After ParquetRowReader construction, the first row group (rg0) gets + // scheduled and loaded. Record the I/O cost as the per-row-group baseline. + const uint64_t bytesAfterOpen = ioStats->rawBytesRead(); + EXPECT_GT(bytesAfterOpen, 0); + + // Skip the remainder of rg0 (127 rows) + entire rg1 (127 rows) = 254 rows. + // Since rg0 is already loaded, the in-group skip just advances the read + // offset, and rg1 is fully skipped without being scheduled for I/O. As a + // result, rawBytesRead must not change. + EXPECT_EQ(rowReader->skip(254), 254); + EXPECT_EQ(ioStats->rawBytesRead(), bytesAfterOpen) + << "skip() must not issue I/O for row groups that are fully skipped"; + + // next() should trigger loading of exactly one row group (rg2). + VectorPtr result = BaseVector::create(rowType, 0, leafPool_.get()); + auto n = rowReader->next(10, result); + EXPECT_EQ(n, 10); + auto* rowVec = result->asUnchecked(); + auto* idVec = rowVec->childAt(0)->asFlatVector(); + ASSERT_NE(idVec, nullptr); + EXPECT_EQ(idVec->valueAt(0), 255); + + const uint64_t bytesAfterNext = ioStats->rawBytesRead(); + const uint64_t deltaForLanding = bytesAfterNext - bytesAfterOpen; + EXPECT_GT(deltaForLanding, 0); + // next() after skip should only load the single landing row group (rg2). + // rg2 has the same size as rg0 (both 127 rows with the same schema), so + // the delta should be roughly the size of one row group. Guard against + // accidentally loading more than one additional row group. + EXPECT_LT(deltaForLanding, bytesAfterOpen) + << "next() after skip should only load the single landing row group"; +} + TEST_F(ParquetReaderTest, testEmptyRowGroups) { // empty_row_groups.parquet contains empty row groups const std::string sample(getExampleFilePath("empty_row_groups.parquet"));