Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bolt/common/base/BitUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ constexpr inline uint64_t nwords(int32_t bits) {
return roundUp(bits, 64) / 64;
}

constexpr inline uint64_t nwords(uint32_t bits) {
return roundUp(bits, 64) / 64;
}

constexpr inline uint64_t nwords(int64_t bits) {
return bits <= 0 ? 0
: roundUp<uint64_t>(static_cast<uint64_t>(bits), 64) / 64;
}

constexpr inline uint64_t nwords(uint64_t bits) {
return roundUp(bits, 64) / 64;
}

inline int32_t getAndClearLastSetBit(uint16_t& bits) {
int32_t trailingZeros = __builtin_ctz(bits);
// erase last non-zero bit
Expand Down
1 change: 1 addition & 0 deletions bolt/common/base/tests/BitUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ TEST_F(BitUtilTest, nwords) {
EXPECT_EQ(nwords(63), 1);
EXPECT_EQ(nwords(64), 1);
EXPECT_EQ(nwords(65), 2);
EXPECT_EQ(nwords(uint32_t{65}), 2);
}

TEST_F(BitUtilTest, setBits) {
Expand Down
40 changes: 33 additions & 7 deletions bolt/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ void PageReader::readPageDefLevels() {
wideDefineDecoder_, "parquet read error with maxDefine = {}", maxDefine_);
wideDefineDecoder_->GetBatch(definitionLevels_.data(), numRepDefsInPage_);
leafNulls_.resize(bits::nwords(numRepDefsInPage_));
leafNullsSize_ = getLengthsAndNulls(
numRowsInPage_ = getLengthsAndNulls(
LevelMode::kNulls,
leafInfo_,
0,
Expand All @@ -290,13 +290,14 @@ void PageReader::readPageDefLevels() {
nullptr,
leafNulls_.data(),
0);
numRowsInPage_ = leafNullsSize_;
leafNullsSize_ = numRowsInPage_;
Comment on lines 284 to +293
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the position of them?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLengthsAndNulls is int32_t, leafNullsSize_ is int64_t, no need to cast

numLeafNullsConsumed_ = 0;
}

void PageReader::updateRowInfoAfterPageSkipped() {
rowOfPage_ += numRowsInPage_;
if (hasChunkRepDefs_) {
BOLT_CHECK_GE(rowOfPage_, 0);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use __builtin_add_overflow for rowOfPage_ ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A compiler-specific builtin does not seem necessary here, since it would not provide much practical benefit for this case.

numLeafNullsConsumed_ = rowOfPage_;
}
}
Expand Down Expand Up @@ -930,7 +931,7 @@ void PageReader::decodeRepDefsFromBuffer() {
const auto& repDefData = preloadedRepDefs_.front();
const auto* rawData = repDefData.data();
constexpr int32_t WordBits = 64;
size_t erasedBits = erasedLeafNullWords_ * WordBits;
int64_t erasedBits = erasedLeafNullWords_ * WordBits;
BOLT_CHECK_LE(numLeafNullsConsumed_, leafNullsSize_ + erasedBits);
// clear consumed nulls
if (numLeafNullsConsumed_ - erasedBits > WordBits) {
Expand Down Expand Up @@ -1038,7 +1039,7 @@ int32_t PageReader::getLengthsAndNulls(
int32_t maxItems,
int32_t* lengths,
uint64_t* nulls,
int32_t nullsStartIndex) const {
int64_t nullsStartIndex) const {
arrow::ValidityBitmapInputOutput bits;
bits.values_read_upper_bound = maxItems;
bits.values_read = 0;
Expand Down Expand Up @@ -1075,7 +1076,12 @@ int32_t PageReader::getLengthsAndNulls(
break;
}
}
return bits.values_read;
BOLT_CHECK(
bits.values_read >= 0 && bits.values_read <= maxItems,
"values_read out of range: {}, maxItems: {}",
bits.values_read,
maxItems);
return static_cast<int32_t>(bits.values_read);
}

void PageReader::makeDecoder() {
Expand Down Expand Up @@ -1153,6 +1159,7 @@ void PageReader::skip(int64_t numRows) {
if (firstUnvisited_ + numRows >= rowOfPage_ + numRowsInPage_) {
seekToPage(firstUnvisited_ + numRows);
if (hasChunkRepDefs_) {
BOLT_CHECK_GE(rowOfPage_, 0);
numLeafNullsConsumed_ = rowOfPage_;
}
toSkip -= rowOfPage_ - firstUnvisited_;
Expand Down Expand Up @@ -1256,6 +1263,7 @@ PageReader::readNulls(int32_t numValues, BufferPtr& buffer) {
buffer = nullptr;
return nullptr;
}
BOLT_CHECK_GE(numValues, 0);
dwio::common::ensureCapacity<bool>(buffer, numValues, &pool_);
if (isTopLevel_) {
BOLT_CHECK_EQ(1, maxDefine_);
Expand All @@ -1264,12 +1272,29 @@ PageReader::readNulls(int32_t numValues, BufferPtr& buffer) {
numValues, buffer->asMutable<uint64_t>(), &allOnes);
return allOnes ? nullptr : buffer->as<uint64_t>();
}

const int64_t erasedBits = erasedLeafNullWords_ * 64;
const int64_t relativeConsumed = numLeafNullsConsumed_ - erasedBits;
BOLT_CHECK(
relativeConsumed >= 0 && leafNullsSize_ >= numValues &&
relativeConsumed <= leafNullsSize_ - numValues,
"invalid leafNulls range in readNulls(non-top): maxDefine_={} "
"numValues={} numLeafNullsConsumed_={} erasedLeafNullWords_={} "
"erasedBits={} relativeConsumed={} leafNullsSize_={} leafNullsWords={}",
maxDefine_,
numValues,
numLeafNullsConsumed_,
erasedLeafNullWords_,
erasedBits,
relativeConsumed,
leafNullsSize_,
leafNulls_.size());
bits::copyBits(
leafNulls_.data(),
numLeafNullsConsumed_ - erasedLeafNullWords_ * 64,
static_cast<uint64_t>(relativeConsumed),
buffer->asMutable<uint64_t>(),
0,
numValues);
static_cast<uint64_t>(numValues));
numLeafNullsConsumed_ += numValues;
return buffer->as<uint64_t>();
}
Expand Down Expand Up @@ -1298,6 +1323,7 @@ bool PageReader::rowsForPage(
if (rowZero >= rowOfPage_ + numRowsInPage_) {
seekToPage(rowZero);
if (hasChunkRepDefs_) {
BOLT_CHECK_GE(rowOfPage_, 0);
numLeafNullsConsumed_ = rowOfPage_;
}
}
Expand Down
5 changes: 2 additions & 3 deletions bolt/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

#include <arrow/util/rle_encoding.h>
namespace bytedance::bolt::parquet {

constexpr int16_t kNonPageOrdinal = static_cast<int16_t>(-1);
constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;

Expand Down Expand Up @@ -139,7 +138,7 @@ class PageReader {
int32_t maxItems,
int32_t* FOLLY_NULLABLE lengths,
uint64_t* FOLLY_NULLABLE nulls,
int32_t nullsStartIndex) const;
int64_t nullsStartIndex) const;

/// Applies 'visitor' to values in the ColumnChunk of 'this'. The
/// operation to perform and The operand rows are given by
Expand Down Expand Up @@ -519,7 +518,7 @@ class PageReader {
raw_vector<int16_t> repetitionLevels_;

// Number of valid bits in 'leafNulls_'
int32_t leafNullsSize_{0};
int64_t leafNullsSize_{0};

// Number of leaf nulls read.
int64_t numLeafNullsConsumed_{0};
Expand Down