diff --git a/bolt/common/base/BitUtil.h b/bolt/common/base/BitUtil.h index 338bb451c..39a1d37e4 100644 --- a/bolt/common/base/BitUtil.h +++ b/bolt/common/base/BitUtil.h @@ -129,6 +129,19 @@ constexpr inline uint64_t nwords(int32_t bits) { return roundUp(bits, 64) / 64; } +constexpr inline uint64_t nwords(uint32_t bits) { + return roundUp(bits, 64) / 64; +} + +constexpr inline uint64_t nwords(int64_t bits) { + return bits <= 0 ? 0 + : roundUp(static_cast(bits), 64) / 64; +} + +constexpr inline uint64_t nwords(uint64_t bits) { + return roundUp(bits, 64) / 64; +} + inline int32_t getAndClearLastSetBit(uint16_t& bits) { int32_t trailingZeros = __builtin_ctz(bits); // erase last non-zero bit diff --git a/bolt/common/base/tests/BitUtilTest.cpp b/bolt/common/base/tests/BitUtilTest.cpp index 4c35280a5..4a8860e2f 100644 --- a/bolt/common/base/tests/BitUtilTest.cpp +++ b/bolt/common/base/tests/BitUtilTest.cpp @@ -228,6 +228,7 @@ TEST_F(BitUtilTest, nwords) { EXPECT_EQ(nwords(63), 1); EXPECT_EQ(nwords(64), 1); EXPECT_EQ(nwords(65), 2); + EXPECT_EQ(nwords(uint32_t{65}), 2); } TEST_F(BitUtilTest, setBits) { diff --git a/bolt/dwio/parquet/reader/PageReader.cpp b/bolt/dwio/parquet/reader/PageReader.cpp index 648a352c9..055e69c2e 100644 --- a/bolt/dwio/parquet/reader/PageReader.cpp +++ b/bolt/dwio/parquet/reader/PageReader.cpp @@ -281,7 +281,7 @@ void PageReader::readPageDefLevels() { wideDefineDecoder_, "parquet read error with maxDefine = {}", maxDefine_); wideDefineDecoder_->GetBatch(definitionLevels_.data(), numRepDefsInPage_); leafNulls_.resize(bits::nwords(numRepDefsInPage_)); - leafNullsSize_ = getLengthsAndNulls( + numRowsInPage_ = getLengthsAndNulls( LevelMode::kNulls, leafInfo_, 0, @@ -290,13 +290,14 @@ void PageReader::readPageDefLevels() { nullptr, leafNulls_.data(), 0); - numRowsInPage_ = leafNullsSize_; + leafNullsSize_ = numRowsInPage_; numLeafNullsConsumed_ = 0; } void PageReader::updateRowInfoAfterPageSkipped() { rowOfPage_ += numRowsInPage_; if (hasChunkRepDefs_) { + BOLT_CHECK_GE(rowOfPage_, 0); numLeafNullsConsumed_ = rowOfPage_; } } @@ -930,7 +931,7 @@ void PageReader::decodeRepDefsFromBuffer() { const auto& repDefData = preloadedRepDefs_.front(); const auto* rawData = repDefData.data(); constexpr int32_t WordBits = 64; - size_t erasedBits = erasedLeafNullWords_ * WordBits; + int64_t erasedBits = erasedLeafNullWords_ * WordBits; BOLT_CHECK_LE(numLeafNullsConsumed_, leafNullsSize_ + erasedBits); // clear consumed nulls if (numLeafNullsConsumed_ - erasedBits > WordBits) { @@ -1038,7 +1039,7 @@ int32_t PageReader::getLengthsAndNulls( int32_t maxItems, int32_t* lengths, uint64_t* nulls, - int32_t nullsStartIndex) const { + int64_t nullsStartIndex) const { arrow::ValidityBitmapInputOutput bits; bits.values_read_upper_bound = maxItems; bits.values_read = 0; @@ -1075,7 +1076,12 @@ int32_t PageReader::getLengthsAndNulls( break; } } - return bits.values_read; + BOLT_CHECK( + bits.values_read >= 0 && bits.values_read <= maxItems, + "values_read out of range: {}, maxItems: {}", + bits.values_read, + maxItems); + return static_cast(bits.values_read); } void PageReader::makeDecoder() { @@ -1153,6 +1159,7 @@ void PageReader::skip(int64_t numRows) { if (firstUnvisited_ + numRows >= rowOfPage_ + numRowsInPage_) { seekToPage(firstUnvisited_ + numRows); if (hasChunkRepDefs_) { + BOLT_CHECK_GE(rowOfPage_, 0); numLeafNullsConsumed_ = rowOfPage_; } toSkip -= rowOfPage_ - firstUnvisited_; @@ -1256,6 +1263,7 @@ PageReader::readNulls(int32_t numValues, BufferPtr& buffer) { buffer = nullptr; return nullptr; } + BOLT_CHECK_GE(numValues, 0); dwio::common::ensureCapacity(buffer, numValues, &pool_); if (isTopLevel_) { BOLT_CHECK_EQ(1, maxDefine_); @@ -1264,12 +1272,29 @@ PageReader::readNulls(int32_t numValues, BufferPtr& buffer) { numValues, buffer->asMutable(), &allOnes); return allOnes ? nullptr : buffer->as(); } + + const int64_t erasedBits = erasedLeafNullWords_ * 64; + const int64_t relativeConsumed = numLeafNullsConsumed_ - erasedBits; + BOLT_CHECK( + relativeConsumed >= 0 && leafNullsSize_ >= numValues && + relativeConsumed <= leafNullsSize_ - numValues, + "invalid leafNulls range in readNulls(non-top): maxDefine_={} " + "numValues={} numLeafNullsConsumed_={} erasedLeafNullWords_={} " + "erasedBits={} relativeConsumed={} leafNullsSize_={} leafNullsWords={}", + maxDefine_, + numValues, + numLeafNullsConsumed_, + erasedLeafNullWords_, + erasedBits, + relativeConsumed, + leafNullsSize_, + leafNulls_.size()); bits::copyBits( leafNulls_.data(), - numLeafNullsConsumed_ - erasedLeafNullWords_ * 64, + static_cast(relativeConsumed), buffer->asMutable(), 0, - numValues); + static_cast(numValues)); numLeafNullsConsumed_ += numValues; return buffer->as(); } @@ -1298,6 +1323,7 @@ bool PageReader::rowsForPage( if (rowZero >= rowOfPage_ + numRowsInPage_) { seekToPage(rowZero); if (hasChunkRepDefs_) { + BOLT_CHECK_GE(rowOfPage_, 0); numLeafNullsConsumed_ = rowOfPage_; } } diff --git a/bolt/dwio/parquet/reader/PageReader.h b/bolt/dwio/parquet/reader/PageReader.h index 80e0a3e38..7cca41acf 100644 --- a/bolt/dwio/parquet/reader/PageReader.h +++ b/bolt/dwio/parquet/reader/PageReader.h @@ -47,7 +47,6 @@ #include namespace bytedance::bolt::parquet { - constexpr int16_t kNonPageOrdinal = static_cast(-1); constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; @@ -139,7 +138,7 @@ class PageReader { int32_t maxItems, int32_t* FOLLY_NULLABLE lengths, uint64_t* FOLLY_NULLABLE nulls, - int32_t nullsStartIndex) const; + int64_t nullsStartIndex) const; /// Applies 'visitor' to values in the ColumnChunk of 'this'. The /// operation to perform and The operand rows are given by @@ -519,7 +518,7 @@ class PageReader { raw_vector repetitionLevels_; // Number of valid bits in 'leafNulls_' - int32_t leafNullsSize_{0}; + int64_t leafNullsSize_{0}; // Number of leaf nulls read. int64_t numLeafNullsConsumed_{0};