Skip to content

Commit 7e3f4b2

Browse files
tanjialiangfacebook-github-bot
authored andcommitted
fix: SortingWriter should not estimate 0 row for processing (#14735)
Summary: SortingWriter::outputBatchRows() returns 0 if maxOutputBytesConfig_ is less than the estimated row size, making subsequent check fail. This method shall never return 0 as a correct behavior. Flooring it with 1 instead. Reviewed By: amitkdutta Differential Revision: D81729964
1 parent 7992424 commit 7e3f4b2

File tree

3 files changed

+133
-2
lines changed

3 files changed

+133
-2
lines changed

velox/dwio/common/SortingWriter.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,9 @@ vector_size_t SortingWriter::outputBatchRows() {
142142
std::numeric_limits<vector_size_t>::max();
143143
if (sortBuffer_->estimateOutputRowSize().has_value() &&
144144
sortBuffer_->estimateOutputRowSize().value() != 0) {
145-
const uint64_t maxOutputRows =
146-
maxOutputBytesConfig_ / sortBuffer_->estimateOutputRowSize().value();
145+
const auto maxOutputRows = std::max<size_t>(
146+
static_cast<size_t>(1),
147+
maxOutputBytesConfig_ / sortBuffer_->estimateOutputRowSize().value());
147148
if (UNLIKELY(maxOutputRows > std::numeric_limits<vector_size_t>::max())) {
148149
return maxOutputRowsConfig_;
149150
}

velox/dwio/common/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ add_executable(
4848
ReaderTest.cpp
4949
RetryTests.cpp
5050
ScanSpecTest.cpp
51+
SortingWriterTest.cpp
5152
TestBufferedInput.cpp
5253
ThrottlerTest.cpp
5354
TypeTests.cpp
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/common/SortingWriter.h"
18+
#include <gtest/gtest.h>
19+
20+
#include "velox/common/base/tests/GTestUtils.h"
21+
#include "velox/exec/SortBuffer.h"
22+
#include "velox/vector/tests/utils/VectorTestBase.h"
23+
24+
using namespace facebook::velox::exec;
25+
using namespace facebook::velox::memory;
26+
27+
namespace facebook::velox::dwio::common::test {
28+
29+
class MockWriter : public Writer {
30+
public:
31+
MockWriter() {
32+
setState(State::kRunning);
33+
}
34+
35+
void write(const VectorPtr& data) override {
36+
writtenData_.push_back(data);
37+
totalRowsWritten_ += data->size();
38+
}
39+
40+
bool finish() override {
41+
return true;
42+
}
43+
44+
void flush() override {}
45+
46+
void close() override {
47+
setState(State::kClosed);
48+
}
49+
50+
void abort() override {
51+
setState(State::kAborted);
52+
}
53+
54+
uint64_t getTotalRowsWritten() const {
55+
return totalRowsWritten_;
56+
}
57+
58+
private:
59+
std::vector<VectorPtr> writtenData_;
60+
uint64_t totalRowsWritten_ = 0;
61+
};
62+
63+
class SortingWriterTest : public testing::Test,
64+
public velox::test::VectorTestBase {
65+
protected:
66+
static void SetUpTestCase() {
67+
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
68+
}
69+
70+
std::unique_ptr<SortBuffer> createSortBuffer() {
71+
const RowTypePtr inputType =
72+
ROW({{"c0", BIGINT()}, {"c1", INTEGER()}, {"c2", VARCHAR()}});
73+
74+
const std::vector<column_index_t> sortColumnIndices{1};
75+
const std::vector<CompareFlags> sortCompareFlags{
76+
{true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}};
77+
78+
const velox::common::PrefixSortConfig prefixSortConfig{
79+
std::numeric_limits<uint32_t>::max(),
80+
std::numeric_limits<uint32_t>::max(),
81+
12};
82+
83+
return std::make_unique<SortBuffer>(
84+
inputType,
85+
sortColumnIndices,
86+
sortCompareFlags,
87+
pool_.get(),
88+
&nonReclaimableSection_,
89+
prefixSortConfig);
90+
}
91+
92+
RowVectorPtr createTestData(uint64_t numRows = 1000) {
93+
return makeRowVector(
94+
{makeFlatVector<int64_t>(
95+
numRows, [](vector_size_t row) { return row; }),
96+
makeFlatVector<int32_t>(
97+
numRows, [numRows](vector_size_t row) { return numRows - row; }),
98+
makeFlatVector<std::string>(numRows, [](vector_size_t row) {
99+
return fmt::format("row_{}", row);
100+
})});
101+
}
102+
103+
tsan_atomic<bool> nonReclaimableSection_{false};
104+
};
105+
106+
TEST_F(SortingWriterTest, largeRowSizeExceedsMaxOutputBytes) {
107+
auto mockWriter = std::make_unique<MockWriter>();
108+
auto mockWriterPtr = mockWriter.get();
109+
110+
auto sortBuffer = createSortBuffer();
111+
112+
const vector_size_t maxOutputRowsConfig = 1000;
113+
const uint64_t maxOutputBytesConfig = 1;
114+
const uint64_t outputTimeSliceLimitMs = 1000;
115+
116+
SortingWriter sortingWriter(
117+
std::move(mockWriter),
118+
std::move(sortBuffer),
119+
maxOutputRowsConfig,
120+
maxOutputBytesConfig,
121+
outputTimeSliceLimitMs);
122+
123+
RowVectorPtr testData = createTestData(10);
124+
sortingWriter.write(testData);
125+
ASSERT_TRUE(sortingWriter.finish());
126+
ASSERT_GT(mockWriterPtr->getTotalRowsWritten(), 0);
127+
}
128+
129+
} // namespace facebook::velox::dwio::common::test

0 commit comments

Comments
 (0)