Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bolt/common/time/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class MicrosecondTimer {
}

~MicrosecondTimer() {
if (FOLLY_LIKELY(timer_)) {
if (FOLLY_LIKELY(timer_ != nullptr)) {
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - start_);

Expand All @@ -72,7 +72,7 @@ class NanosecondTimer {
}

~NanosecondTimer() {
if (FOLLY_LIKELY(timer_)) {
if (FOLLY_LIKELY(timer_ != nullptr)) {
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now() - start_);

Expand Down
2 changes: 2 additions & 0 deletions bolt/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,8 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kWaitForScanScaleUp";
case BlockingReason::kWaitForIndexLookup:
return "kWaitForIndexLookup";
case BlockingReason::kWaitForShuffle:
return "kWaitForShuffle";
default:
BOLT_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
Expand Down
3 changes: 3 additions & 0 deletions bolt/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ enum class BlockingReason {
/// Used by IndexLookupJoin operator, indicating that it was blocked by the
/// async index lookup.
kWaitForIndexLookup,
/// Used by SparkShuffleWriter to wait for peer SparkShuffleWriter operators
/// in the same pipeline to finish finalization.
kWaitForShuffle,
};

std::string blockingReasonToString(BlockingReason reason);
Expand Down
5 changes: 0 additions & 5 deletions bolt/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,6 @@ uint32_t maxDrivers(
// single-threaded for now. This assumption might not hold in the future.
else if (node->name() == "SparkShuffleReader") {
return 1;
}
// multi-threaded spark: SparkShuffleWriter is designed to be
// single-threaded for now. This assumption might not hold in the future.
else if (node->name() == "SparkShuffleWriter") {
return 1;
} else {
auto result = Operator::maxDrivers(node);
if (result) {
Expand Down
15 changes: 14 additions & 1 deletion bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,24 @@ arrow::Status BoltRowBasedSortShuffleWriter::reclaimFixedSize(
}

arrow::Status BoltRowBasedSortShuffleWriter::stop() {
return stopInternal(true);
}

arrow::Status BoltRowBasedSortShuffleWriter::localStop() {
return stopInternal(false);
}

arrow::Status BoltRowBasedSortShuffleWriter::stopInternal(
bool stopPartitionWriter) {
bytedance::bolt::NanosecondTimer stopTimer(&stopTime_);
setSplitState(SplitState::kStop);
RETURN_NOT_OK(tryEvict());
{
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
if (stopPartitionWriter) {
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
} else {
RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_));
}
metrics_.useRowBased = 1;
combinedVectorNumber_ = combineVectorTimes_ > 0
? (combinedVectorNumber_ / combineVectorTimes_)
Expand Down
4 changes: 4 additions & 0 deletions bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class BoltRowBasedSortShuffleWriter final : public BoltShuffleWriter {

arrow::Status stop() override;

arrow::Status localStop() override;

arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;

BoltRowBasedSortShuffleWriter(
Expand All @@ -52,6 +54,8 @@ class BoltRowBasedSortShuffleWriter final : public BoltShuffleWriter {
: BoltShuffleWriter(std::move(options), boltPool, pool) {}

private:
arrow::Status stopInternal(bool stopPartitionWriter);

arrow::Status init() override;

arrow::Status initFromRowVector(
Expand Down
22 changes: 20 additions & 2 deletions bolt/shuffle/sparksql/BoltShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,14 @@ arrow::Status BoltShuffleWriter::split(
}

arrow::Status BoltShuffleWriter::stop() {
return stopInternal(true);
}

arrow::Status BoltShuffleWriter::localStop() {
return stopInternal(false);
}

arrow::Status BoltShuffleWriter::stopInternal(bool stopPartitionWriter) {
bytedance::bolt::NanosecondTimer stopTimer(&stopTime_);
if (vectorLayout_ != RowVectorLayout::kComposite) {
partitionWriter_->setRowFormat(false);
Expand Down Expand Up @@ -720,9 +728,15 @@ arrow::Status BoltShuffleWriter::stop() {
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
setSplitState(SplitState::kStop);
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
if (stopPartitionWriter) {
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
} else {
RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_));
}
metrics_.avgPreallocSize =
(preallocCount_ == 0) ? 0 : totalPreallocSize_ / preallocCount_;
metrics_.totalPreallocSize = totalPreallocSize_;
metrics_.totalPreallocCount = preallocCount_;
partitionBuffers_.clear();
}

Expand All @@ -732,7 +746,11 @@ arrow::Status BoltShuffleWriter::stop() {
RETURN_NOT_OK(tryEvict());
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
if (stopPartitionWriter) {
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
} else {
RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_));
}
}
}
metrics_.useV2 = 0;
Expand Down
5 changes: 5 additions & 0 deletions bolt/shuffle/sparksql/BoltShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ class BoltShuffleWriter : public ShuffleWriter {

arrow::Status stop() override;

// stop partition writer in local thread
arrow::Status localStop() override;

arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;

const uint64_t cachedPayloadSize() const override;
Expand Down Expand Up @@ -512,6 +515,8 @@ class BoltShuffleWriter : public ShuffleWriter {
// for CompositeRowVector
virtual arrow::Status tryEvict(
int64_t memLimit = std::numeric_limits<int64_t>::max());

arrow::Status stopInternal(bool stopPartitionWriter);
arrow::Status initFromRowVectorForComposite(
const bytedance::bolt::RowVector& rv);
virtual arrow::Status splitCompositeVector(
Expand Down
20 changes: 18 additions & 2 deletions bolt/shuffle/sparksql/BoltShuffleWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ arrow::Status BoltShuffleWriterV2::initPartitions() {
}

arrow::Status BoltShuffleWriterV2::stop() {
return stopInternal(true);
}

arrow::Status BoltShuffleWriterV2::localStop() {
return stopInternal(false);
}

arrow::Status BoltShuffleWriterV2::stopInternal(bool stopPartitionWriter) {
bytedance::bolt::NanosecondTimer stopTimer(&stopTime_);
if (vectorLayout_ == RowVectorLayout::kColumnar) {
partitionWriter_->setRowFormat(false);
Expand All @@ -275,7 +283,11 @@ arrow::Status BoltShuffleWriterV2::stop() {
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
setSplitState(SplitState::kStop);
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
if (stopPartitionWriter) {
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
} else {
RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_));
}
metrics_.rowVectorModeCompress = rowVectorModeCompress_;
releaseBufferPoolMemory();
}
Expand All @@ -286,7 +298,11 @@ arrow::Status BoltShuffleWriterV2::stop() {
RETURN_NOT_OK(tryEvict());
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
if (stopPartitionWriter) {
RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
} else {
RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_));
}
}
}
metrics_.useV2 = 1;
Expand Down
4 changes: 4 additions & 0 deletions bolt/shuffle/sparksql/BoltShuffleWriterV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class BoltShuffleWriterV2 final : public BoltShuffleWriter {

arrow::Status stop() override;

arrow::Status localStop() override;

arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;

BoltShuffleWriterV2(
Expand All @@ -219,6 +221,8 @@ class BoltShuffleWriterV2 final : public BoltShuffleWriter {
}

private:
arrow::Status stopInternal(bool stopPartitionWriter);

void checkLengthBuffer(uint32_t col, uint32_t pid);

void checkNullValue(int32_t col, uint32_t pid, uint32_t numRows);
Expand Down
56 changes: 56 additions & 0 deletions bolt/shuffle/sparksql/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ struct PartitionWriterOptions {
// for LocalPartitionWriter
std::string dataFile;
std::vector<std::string> configuredDirs;
// For parallel SparkShuffleWriter + LocalPartitionWriter execution, partial
// files use a per-driver suffix when part >= 0.
int32_t part = -1;

// for CelebornPartitionWriter
std::shared_ptr<RssClient> rssClient;
Expand Down Expand Up @@ -168,6 +171,8 @@ struct ShuffleWriterMetrics {
int64_t totalEvictTime{0};
int64_t totalCompressTime{0};
int64_t maxPartitionBufferSize{0};
int64_t totalPreallocSize{0};
int64_t totalPreallocCount{0};
int64_t avgPreallocSize{0};
int64_t useV2{0};
int64_t rowVectorModeCompress{0};
Expand All @@ -184,6 +189,57 @@ struct ShuffleWriterMetrics {
int64_t dataSize{0};
std::vector<int64_t> partitionLengths{};
std::vector<int64_t> rawPartitionLengths{}; // Uncompressed size.
std::string dataFile{};

ShuffleWriterMetrics& operator+=(const ShuffleWriterMetrics& rhs) {
totalInputRowNumber += rhs.totalInputRowNumber;
totalInputBatches += rhs.totalInputBatches;
totalBytesWritten += rhs.totalBytesWritten;
totalBytesEvicted += rhs.totalBytesEvicted;
totalWriteTime += rhs.totalWriteTime;
totalEvictTime += rhs.totalEvictTime;
totalCompressTime += rhs.totalCompressTime;
maxPartitionBufferSize =
std::max(maxPartitionBufferSize, rhs.maxPartitionBufferSize);
totalPreallocSize += rhs.totalPreallocSize;
totalPreallocCount += rhs.totalPreallocCount;
avgPreallocSize =
totalPreallocCount == 0 ? 0 : totalPreallocSize / totalPreallocCount;
useV2 = rhs.useV2;
rowVectorModeCompress += rhs.rowVectorModeCompress;
combinedVectorNumber += rhs.combinedVectorNumber;
combineVectorTimes += rhs.combineVectorTimes;
combineVectorCost += rhs.combineVectorCost;
useRowBased += rhs.useRowBased;
splitTime += rhs.splitTime;
convertTime += rhs.convertTime;
flattenTime += rhs.flattenTime;
computePidTime += rhs.computePidTime;
shuffleWriteTime += rhs.shuffleWriteTime;
dataSize += rhs.dataSize;

if (!rhs.partitionLengths.empty()) {
if (partitionLengths.empty()) {
partitionLengths.resize(rhs.partitionLengths.size(), 0);
}
BOLT_CHECK_EQ(partitionLengths.size(), rhs.partitionLengths.size());
for (size_t i = 0; i < partitionLengths.size(); ++i) {
partitionLengths[i] += rhs.partitionLengths[i];
}
}

if (!rhs.rawPartitionLengths.empty()) {
if (rawPartitionLengths.empty()) {
rawPartitionLengths.resize(rhs.rawPartitionLengths.size(), 0);
}
BOLT_CHECK_EQ(rawPartitionLengths.size(), rhs.rawPartitionLengths.size());
for (size_t i = 0; i < rawPartitionLengths.size(); ++i) {
rawPartitionLengths[i] += rhs.rawPartitionLengths[i];
}
}

return *this;
}
};

// Only partitioning that has pid support adaptive shuffle writer, otherwise
Expand Down
2 changes: 2 additions & 0 deletions bolt/shuffle/sparksql/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ShuffleWriter {

virtual arrow::Status stop() = 0;

virtual arrow::Status localStop() = 0;

int32_t numPartitions() const {
return numPartitions_;
}
Expand Down
Loading
Loading