diff --git a/bolt/common/time/Timer.h b/bolt/common/time/Timer.h index 29da40d0c..1a9f16e56 100644 --- a/bolt/common/time/Timer.h +++ b/bolt/common/time/Timer.h @@ -49,7 +49,7 @@ class MicrosecondTimer { } ~MicrosecondTimer() { - if (FOLLY_LIKELY(timer_)) { + if (FOLLY_LIKELY(timer_ != nullptr)) { auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - start_); @@ -72,7 +72,7 @@ class NanosecondTimer { } ~NanosecondTimer() { - if (FOLLY_LIKELY(timer_)) { + if (FOLLY_LIKELY(timer_ != nullptr)) { auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - start_); diff --git a/bolt/exec/Driver.cpp b/bolt/exec/Driver.cpp index 7f69c76aa..aa25cbd2d 100644 --- a/bolt/exec/Driver.cpp +++ b/bolt/exec/Driver.cpp @@ -1219,6 +1219,8 @@ std::string blockingReasonToString(BlockingReason reason) { return "kWaitForScanScaleUp"; case BlockingReason::kWaitForIndexLookup: return "kWaitForIndexLookup"; + case BlockingReason::kWaitForShuffle: + return "kWaitForShuffle"; default: BOLT_UNREACHABLE( fmt::format("Unknown blocking reason {}", static_cast(reason))); diff --git a/bolt/exec/Driver.h b/bolt/exec/Driver.h index 2380e3e8c..8a06440f6 100644 --- a/bolt/exec/Driver.h +++ b/bolt/exec/Driver.h @@ -220,6 +220,9 @@ enum class BlockingReason { /// Used by IndexLookupJoin operator, indicating that it was blocked by the /// async index lookup. kWaitForIndexLookup, + /// Used by SparkShuffleWriter to wait for peer SparkShuffleWriter operators + /// in the same pipeline to finish finalization. + kWaitForShuffle, }; std::string blockingReasonToString(BlockingReason reason); diff --git a/bolt/exec/LocalPlanner.cpp b/bolt/exec/LocalPlanner.cpp index ac8ae87a2..c4c5c99ee 100644 --- a/bolt/exec/LocalPlanner.cpp +++ b/bolt/exec/LocalPlanner.cpp @@ -376,11 +376,6 @@ uint32_t maxDrivers( // single-threaded for now. This assumption might not hold in the future. else if (node->name() == "SparkShuffleReader") { return 1; - } - // multi-threaded spark: SparkShuffleWriter is designed to be - // single-threaded for now. This assumption might not hold in the future. - else if (node->name() == "SparkShuffleWriter") { - return 1; } else { auto result = Operator::maxDrivers(node); if (result) { diff --git a/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.cpp b/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.cpp index a3b76d768..675736bc5 100644 --- a/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.cpp +++ b/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.cpp @@ -209,11 +209,24 @@ arrow::Status BoltRowBasedSortShuffleWriter::reclaimFixedSize( } arrow::Status BoltRowBasedSortShuffleWriter::stop() { + return stopInternal(true); +} + +arrow::Status BoltRowBasedSortShuffleWriter::localStop() { + return stopInternal(false); +} + +arrow::Status BoltRowBasedSortShuffleWriter::stopInternal( + bool stopPartitionWriter) { bytedance::bolt::NanosecondTimer stopTimer(&stopTime_); setSplitState(SplitState::kStop); RETURN_NOT_OK(tryEvict()); { - RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + if (stopPartitionWriter) { + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + } else { + RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_)); + } metrics_.useRowBased = 1; combinedVectorNumber_ = combineVectorTimes_ > 0 ? (combinedVectorNumber_ / combineVectorTimes_) diff --git a/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.h b/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.h index c089ee661..a76c87b12 100644 --- a/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.h +++ b/bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.h @@ -43,6 +43,8 @@ class BoltRowBasedSortShuffleWriter final : public BoltShuffleWriter { arrow::Status stop() override; + arrow::Status localStop() override; + arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; BoltRowBasedSortShuffleWriter( @@ -52,6 +54,8 @@ class BoltRowBasedSortShuffleWriter final : public BoltShuffleWriter { : BoltShuffleWriter(std::move(options), boltPool, pool) {} private: + arrow::Status stopInternal(bool stopPartitionWriter); + arrow::Status init() override; arrow::Status initFromRowVector( diff --git a/bolt/shuffle/sparksql/BoltShuffleWriter.cpp b/bolt/shuffle/sparksql/BoltShuffleWriter.cpp index 6ebb22a0c..590c646f1 100644 --- a/bolt/shuffle/sparksql/BoltShuffleWriter.cpp +++ b/bolt/shuffle/sparksql/BoltShuffleWriter.cpp @@ -684,6 +684,14 @@ arrow::Status BoltShuffleWriter::split( } arrow::Status BoltShuffleWriter::stop() { + return stopInternal(true); +} + +arrow::Status BoltShuffleWriter::localStop() { + return stopInternal(false); +} + +arrow::Status BoltShuffleWriter::stopInternal(bool stopPartitionWriter) { bytedance::bolt::NanosecondTimer stopTimer(&stopTime_); if (vectorLayout_ != RowVectorLayout::kComposite) { partitionWriter_->setRowFormat(false); @@ -720,9 +728,15 @@ arrow::Status BoltShuffleWriter::stop() { { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); setSplitState(SplitState::kStop); - RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + if (stopPartitionWriter) { + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + } else { + RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_)); + } metrics_.avgPreallocSize = (preallocCount_ == 0) ? 0 : totalPreallocSize_ / preallocCount_; + metrics_.totalPreallocSize = totalPreallocSize_; + metrics_.totalPreallocCount = preallocCount_; partitionBuffers_.clear(); } @@ -732,7 +746,11 @@ arrow::Status BoltShuffleWriter::stop() { RETURN_NOT_OK(tryEvict()); { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); - RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + if (stopPartitionWriter) { + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + } else { + RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_)); + } } } metrics_.useV2 = 0; diff --git a/bolt/shuffle/sparksql/BoltShuffleWriter.h b/bolt/shuffle/sparksql/BoltShuffleWriter.h index 3c59e7ee9..0b967cd8d 100644 --- a/bolt/shuffle/sparksql/BoltShuffleWriter.h +++ b/bolt/shuffle/sparksql/BoltShuffleWriter.h @@ -189,6 +189,9 @@ class BoltShuffleWriter : public ShuffleWriter { arrow::Status stop() override; + // stop partition writer in local thread + arrow::Status localStop() override; + arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; const uint64_t cachedPayloadSize() const override; @@ -512,6 +515,8 @@ class BoltShuffleWriter : public ShuffleWriter { // for CompositeRowVector virtual arrow::Status tryEvict( int64_t memLimit = std::numeric_limits::max()); + + arrow::Status stopInternal(bool stopPartitionWriter); arrow::Status initFromRowVectorForComposite( const bytedance::bolt::RowVector& rv); virtual arrow::Status splitCompositeVector( diff --git a/bolt/shuffle/sparksql/BoltShuffleWriterV2.cpp b/bolt/shuffle/sparksql/BoltShuffleWriterV2.cpp index f4f9e2dd2..7e8a83727 100644 --- a/bolt/shuffle/sparksql/BoltShuffleWriterV2.cpp +++ b/bolt/shuffle/sparksql/BoltShuffleWriterV2.cpp @@ -262,6 +262,14 @@ arrow::Status BoltShuffleWriterV2::initPartitions() { } arrow::Status BoltShuffleWriterV2::stop() { + return stopInternal(true); +} + +arrow::Status BoltShuffleWriterV2::localStop() { + return stopInternal(false); +} + +arrow::Status BoltShuffleWriterV2::stopInternal(bool stopPartitionWriter) { bytedance::bolt::NanosecondTimer stopTimer(&stopTime_); if (vectorLayout_ == RowVectorLayout::kColumnar) { partitionWriter_->setRowFormat(false); @@ -275,7 +283,11 @@ arrow::Status BoltShuffleWriterV2::stop() { { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); setSplitState(SplitState::kStop); - RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + if (stopPartitionWriter) { + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + } else { + RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_)); + } metrics_.rowVectorModeCompress = rowVectorModeCompress_; releaseBufferPoolMemory(); } @@ -286,7 +298,11 @@ arrow::Status BoltShuffleWriterV2::stop() { RETURN_NOT_OK(tryEvict()); { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); - RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + if (stopPartitionWriter) { + RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); + } else { + RETURN_NOT_OK(partitionWriter_->populateMetrics(&metrics_)); + } } } metrics_.useV2 = 1; diff --git a/bolt/shuffle/sparksql/BoltShuffleWriterV2.h b/bolt/shuffle/sparksql/BoltShuffleWriterV2.h index 82172803b..7476d8359 100644 --- a/bolt/shuffle/sparksql/BoltShuffleWriterV2.h +++ b/bolt/shuffle/sparksql/BoltShuffleWriterV2.h @@ -206,6 +206,8 @@ class BoltShuffleWriterV2 final : public BoltShuffleWriter { arrow::Status stop() override; + arrow::Status localStop() override; + arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; BoltShuffleWriterV2( @@ -219,6 +221,8 @@ class BoltShuffleWriterV2 final : public BoltShuffleWriter { } private: + arrow::Status stopInternal(bool stopPartitionWriter); + void checkLengthBuffer(uint32_t col, uint32_t pid); void checkNullValue(int32_t col, uint32_t pid, uint32_t numRows); diff --git a/bolt/shuffle/sparksql/Options.h b/bolt/shuffle/sparksql/Options.h index dff8a2f7f..92595204b 100644 --- a/bolt/shuffle/sparksql/Options.h +++ b/bolt/shuffle/sparksql/Options.h @@ -132,6 +132,9 @@ struct PartitionWriterOptions { // for LocalPartitionWriter std::string dataFile; std::vector configuredDirs; + // For parallel SparkShuffleWriter + LocalPartitionWriter execution, partial + // files use a per-driver suffix when part >= 0. + int32_t part = -1; // for CelebornPartitionWriter std::shared_ptr rssClient; @@ -168,6 +171,8 @@ struct ShuffleWriterMetrics { int64_t totalEvictTime{0}; int64_t totalCompressTime{0}; int64_t maxPartitionBufferSize{0}; + int64_t totalPreallocSize{0}; + int64_t totalPreallocCount{0}; int64_t avgPreallocSize{0}; int64_t useV2{0}; int64_t rowVectorModeCompress{0}; @@ -184,6 +189,57 @@ struct ShuffleWriterMetrics { int64_t dataSize{0}; std::vector partitionLengths{}; std::vector rawPartitionLengths{}; // Uncompressed size. + std::string dataFile{}; + + ShuffleWriterMetrics& operator+=(const ShuffleWriterMetrics& rhs) { + totalInputRowNumber += rhs.totalInputRowNumber; + totalInputBatches += rhs.totalInputBatches; + totalBytesWritten += rhs.totalBytesWritten; + totalBytesEvicted += rhs.totalBytesEvicted; + totalWriteTime += rhs.totalWriteTime; + totalEvictTime += rhs.totalEvictTime; + totalCompressTime += rhs.totalCompressTime; + maxPartitionBufferSize = + std::max(maxPartitionBufferSize, rhs.maxPartitionBufferSize); + totalPreallocSize += rhs.totalPreallocSize; + totalPreallocCount += rhs.totalPreallocCount; + avgPreallocSize = + totalPreallocCount == 0 ? 0 : totalPreallocSize / totalPreallocCount; + useV2 = rhs.useV2; + rowVectorModeCompress += rhs.rowVectorModeCompress; + combinedVectorNumber += rhs.combinedVectorNumber; + combineVectorTimes += rhs.combineVectorTimes; + combineVectorCost += rhs.combineVectorCost; + useRowBased += rhs.useRowBased; + splitTime += rhs.splitTime; + convertTime += rhs.convertTime; + flattenTime += rhs.flattenTime; + computePidTime += rhs.computePidTime; + shuffleWriteTime += rhs.shuffleWriteTime; + dataSize += rhs.dataSize; + + if (!rhs.partitionLengths.empty()) { + if (partitionLengths.empty()) { + partitionLengths.resize(rhs.partitionLengths.size(), 0); + } + BOLT_CHECK_EQ(partitionLengths.size(), rhs.partitionLengths.size()); + for (size_t i = 0; i < partitionLengths.size(); ++i) { + partitionLengths[i] += rhs.partitionLengths[i]; + } + } + + if (!rhs.rawPartitionLengths.empty()) { + if (rawPartitionLengths.empty()) { + rawPartitionLengths.resize(rhs.rawPartitionLengths.size(), 0); + } + BOLT_CHECK_EQ(rawPartitionLengths.size(), rhs.rawPartitionLengths.size()); + for (size_t i = 0; i < rawPartitionLengths.size(); ++i) { + rawPartitionLengths[i] += rhs.rawPartitionLengths[i]; + } + } + + return *this; + } }; // Only partitioning that has pid support adaptive shuffle writer, otherwise diff --git a/bolt/shuffle/sparksql/ShuffleWriter.h b/bolt/shuffle/sparksql/ShuffleWriter.h index 42f431f31..5bf62300b 100644 --- a/bolt/shuffle/sparksql/ShuffleWriter.h +++ b/bolt/shuffle/sparksql/ShuffleWriter.h @@ -55,6 +55,8 @@ class ShuffleWriter { virtual arrow::Status stop() = 0; + virtual arrow::Status localStop() = 0; + int32_t numPartitions() const { return numPartitions_; } diff --git a/bolt/shuffle/sparksql/ShuffleWriterNode.cpp b/bolt/shuffle/sparksql/ShuffleWriterNode.cpp index 5cb3236ef..f5d3a9f51 100644 --- a/bolt/shuffle/sparksql/ShuffleWriterNode.cpp +++ b/bolt/shuffle/sparksql/ShuffleWriterNode.cpp @@ -15,11 +15,14 @@ */ #include "bolt/shuffle/sparksql/ShuffleWriterNode.h" +#include #include "bolt/common/memory/sparksql/ExecutionMemoryPool.h" +#include "bolt/exec/Task.h" #include "bolt/shuffle/sparksql/BoltArrowMemoryPool.h" #include "bolt/shuffle/sparksql/BoltRowBasedSortShuffleWriter.h" #include "bolt/shuffle/sparksql/BoltShuffleWriter.h" #include "bolt/shuffle/sparksql/BoltShuffleWriterV2.h" +#include "bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.h" using namespace bytedance::bolt::shuffle::sparksql; using namespace bytedance::bolt; using namespace bytedance::bolt::exec; @@ -40,7 +43,9 @@ SparkShuffleWriter::SparkShuffleWriter( minMemLimit_( shuffleWriterOptions_.partitionWriterOptions.shuffleBufferSize), reportShuffleStatusCallback_( - shuffleWriterNode->getReportShuffleStatusCallback()) {} + shuffleWriterNode->getReportShuffleStatusCallback()) { + shuffleWriterOptions_.partitionWriterOptions.part = driverCtx->driverId; +} void SparkShuffleWriter::init(const bytedance::bolt::RowVectorPtr& rv) { arrowPool_ = std::make_unique(pool()); @@ -85,18 +90,106 @@ void SparkShuffleWriter::addInput(RowVectorPtr input) { void SparkShuffleWriter::noMoreInput() { Operator::noMoreInput(); - ShuffleWriterMetrics metrics; - if (shuffleWriter_) { + const bool isLocalPartitionWriter = + shuffleWriterOptions_.partitionWriterOptions.partitionWriterType == + PartitionWriterType::kLocal; + + if (shuffleWriter_ && isLocalPartitionWriter) { auto status = shuffleWriter_->stop(); BOLT_CHECK(status.ok(), "Native shuffle write: ShuffleWriter stop failed"); - metrics = shuffleWriter_->metrics(); - } else { - metrics.partitionLengths = std::vector( - shuffleWriterOptions_.partitionWriterOptions.numPartitions, 0); - metrics.rawPartitionLengths = std::vector( - shuffleWriterOptions_.partitionWriterOptions.numPartitions, 0); + } + + std::vector promises; + std::vector> peers; + if (!operatorCtx_->task()->allPeersFinished( + planNodeId(), operatorCtx_->driver(), &future_, promises, peers)) { + BOLT_CHECK(future_.valid()); + LOG(INFO) << "SparkShuffleWriter finished but not the last one."; + return; + } + + LOG(INFO) << "Last SparkShuffleWriter finished."; + if (shuffleWriter_ && !isLocalPartitionWriter) { + auto status = shuffleWriter_->stop(); + BOLT_CHECK(status.ok(), "Native shuffle write: ShuffleWriter stop failed"); + } + + std::vector> peerPartitionLengths; + std::vector peerDataFiles; + ShuffleWriterMetrics metrics; + { + auto promisesGuard = folly::makeGuard([&]() { + peers.clear(); + for (auto& promise : promises) { + promise.setValue(); + } + }); + + for (auto& peer : peers) { + auto* op = peer->findOperator(planNodeId()); + auto* writer = dynamic_cast(op); + BOLT_CHECK_NOT_NULL(writer); + if (writer->shuffleWriter_) { + if (!isLocalPartitionWriter) { + auto status = writer->shuffleWriter_->localStop(); + BOLT_CHECK( + status.ok(), "Native shuffle write: ShuffleWriter stop failed"); + } + peerDataFiles.push_back(writer->shuffleWriter_->metrics().dataFile); + peerPartitionLengths.push_back( + writer->shuffleWriter_->metrics().partitionLengths); + metrics += writer->shuffleWriter_->metrics(); + } + } + } + + if (shuffleWriter_) { + peerDataFiles.push_back(shuffleWriter_->metrics().dataFile); + peerPartitionLengths.push_back(shuffleWriter_->metrics().partitionLengths); + metrics += shuffleWriter_->metrics(); + } + + const auto numPartitions = + shuffleWriterOptions_.partitionWriterOptions.numPartitions; + for (const auto& partitionLengths : peerPartitionLengths) { + BOLT_CHECK_EQ( + partitionLengths.size(), + numPartitions, + "partitionLengths size={} not equal to numPartitions={}", + partitionLengths.size(), + numPartitions); + } + + if (!peerDataFiles.empty() && isLocalPartitionWriter) { + if (peerDataFiles.size() == 1) { + const auto& srcFileName = peerDataFiles[0]; + LOG(INFO) << "SparkShuffleWriter: rename shuffle file " << srcFileName + << " -> " + << shuffleWriterOptions_.partitionWriterOptions.dataFile; + auto localFs = std::make_shared(); + auto status = localFs->Move( + srcFileName, shuffleWriterOptions_.partitionWriterOptions.dataFile); + BOLT_CHECK( + status.ok(), + "SparkShuffleWriter: rename shuffle file {} -> {} failed", + srcFileName, + shuffleWriterOptions_.partitionWriterOptions.dataFile); + } else { + LOG(INFO) << "SparkShuffleWriter merge [" << peerDataFiles.size() + << "] shuffle files -> " + << shuffleWriterOptions_.partitionWriterOptions.dataFile; + auto status = LocalPartitionWriter::merge( + peerDataFiles, + peerPartitionLengths, + shuffleWriterOptions_.partitionWriterOptions.dataFile); + BOLT_CHECK(status.ok(), "Merge shuffle files failed"); + } + } - LOG(INFO) << "ShuffleWriter is null"; + if (peerPartitionLengths.empty()) { + metrics.partitionLengths = std::vector(numPartitions, 0); + metrics.rawPartitionLengths = std::vector(numPartitions, 0); + LOG(INFO) << "No SparkShuffleWriter generates shuffle data"; } reportShuffleStatusCallback_(metrics); diff --git a/bolt/shuffle/sparksql/ShuffleWriterNode.h b/bolt/shuffle/sparksql/ShuffleWriterNode.h index 327499b72..49e0badce 100644 --- a/bolt/shuffle/sparksql/ShuffleWriterNode.h +++ b/bolt/shuffle/sparksql/ShuffleWriterNode.h @@ -107,12 +107,16 @@ class SparkShuffleWriter : public bytedance::bolt::exec::Operator { bytedance::bolt::RowVectorPtr getOutput() override; bytedance::bolt::exec::BlockingReason isBlocked( - bytedance::bolt::ContinueFuture* /* unused */) override { - return bytedance::bolt::exec::BlockingReason::kNotBlocked; + bytedance::bolt::ContinueFuture* future) override { + if (!future_.valid()) { + return bytedance::bolt::exec::BlockingReason::kNotBlocked; + } + *future = std::move(future_); + return bytedance::bolt::exec::BlockingReason::kWaitForShuffle; } bool isFinished() override { - return finished_; + return !future_.valid() && finished_; } void noMoreInput() override; @@ -135,6 +139,8 @@ class SparkShuffleWriter : public bytedance::bolt::exec::Operator { std::shared_ptr shuffleWriter_; bool finished_ = false; ReportShuffleStatusCallback reportShuffleStatusCallback_; + bytedance::bolt::ContinueFuture future_{ + bytedance::bolt::ContinueFuture::makeEmpty()}; }; class SparkShuffleWriterTranslator diff --git a/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.cpp b/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.cpp index 289ec878c..5f6a6d254 100644 --- a/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.cpp +++ b/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.cpp @@ -834,6 +834,7 @@ arrow::Status LocalPartitionWriter::populateMetrics( metrics->totalBytesWritten += totalBytesWritten_; metrics->partitionLengths = std::move(partitionLengths_); metrics->rawPartitionLengths = std::move(rawPartitionLengths_); + metrics->dataFile = dataFile_; return arrow::Status::OK(); } @@ -1016,4 +1017,68 @@ arrow::Status LocalPartitionWriter::mergeRowSpills(uint32_t partitionId) { return arrow::Status::OK(); } +arrow::Status LocalPartitionWriter::merge( + std::vector dataFiles, + std::vector> partitionLengths, + const std::string& targetFileName) { + constexpr int64_t kMergeChunkBytes = 8 << 20; + + BOLT_CHECK_EQ( + dataFiles.size(), + partitionLengths.size(), + "size dataFiles({}) != size of partitionLengths({})", + dataFiles.size(), + partitionLengths.size()); + BOLT_CHECK( + !targetFileName.empty(), + "LocalPartitionWriter::merge targetFileName is empty"); + BOLT_CHECK_GT(dataFiles.size(), 0); + + std::vector> fileHandles; + fileHandles.reserve(dataFiles.size()); + const auto numPartitions = partitionLengths[0].size(); + + for (const auto& file : dataFiles) { + BOLT_CHECK( + !file.empty(), "LocalPartitionWriter cannot merge an empty file"); + ARROW_ASSIGN_OR_RAISE(auto handle, arrow::io::ReadableFile::Open(file)); + fileHandles.push_back(std::move(handle)); + } + + ARROW_ASSIGN_OR_RAISE( + auto finalOs, arrow::io::FileOutputStream::Open(targetFileName, false)); + + for (size_t pid = 0; pid < numPartitions; ++pid) { + for (size_t fid = 0; fid < dataFiles.size(); ++fid) { + BOLT_CHECK_EQ(partitionLengths[fid].size(), numPartitions); + int64_t remainingBytes = partitionLengths[fid][pid]; + while (remainingBytes > 0) { + const auto chunkBytes = std::min(kMergeChunkBytes, remainingBytes); + ARROW_ASSIGN_OR_RAISE(auto buffer, fileHandles[fid]->Read(chunkBytes)); + BOLT_CHECK_EQ( + buffer->size(), + chunkBytes, + "Unexpected EOF while merging shuffle file {}, expected {} bytes, got {} bytes", + dataFiles[fid], + chunkBytes, + buffer->size()); + ARROW_CHECK_OK(finalOs->Write(buffer)); + remainingBytes -= chunkBytes; + } + } + } + + ARROW_CHECK_OK(finalOs->Close()); + for (auto& handle : fileHandles) { + ARROW_CHECK_OK(handle->Close()); + } + + auto localFs = std::make_shared(); + for (const auto& file : dataFiles) { + ARROW_CHECK_OK(localFs->DeleteFile(file)); + } + + return arrow::Status::OK(); +} + } // namespace bytedance::bolt::shuffle::sparksql diff --git a/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.h b/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.h index 12ed2d64a..1ae5b66ca 100644 --- a/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.h +++ b/bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.h @@ -33,6 +33,7 @@ #include #include +#include #include "bolt/shuffle/sparksql/partition_writer/PartitionWriter.h" namespace bytedance::bolt::shuffle::sparksql { @@ -114,6 +115,13 @@ class LocalPartitionWriter : public PartitionWriter { class PartitionRowWriter; + static arrow::Status merge( + std::vector dataFiles, + std::vector> partitionLengths, + const std::string& targetFileName); + + arrow::Status populateMetrics(ShuffleWriterMetrics* metrics) override; + private: void init(); @@ -129,8 +137,6 @@ class LocalPartitionWriter : public PartitionWriter { arrow::Status clearResource(); - arrow::Status populateMetrics(ShuffleWriterMetrics* metrics); - std::string dataFile_; std::vector localDirs_; diff --git a/bolt/shuffle/sparksql/partition_writer/PartitionWriter.cpp b/bolt/shuffle/sparksql/partition_writer/PartitionWriter.cpp index faab9c882..a2e8f5aa9 100644 --- a/bolt/shuffle/sparksql/partition_writer/PartitionWriter.cpp +++ b/bolt/shuffle/sparksql/partition_writer/PartitionWriter.cpp @@ -26,11 +26,16 @@ std::unique_ptr PartitionWriter::create( PartitionWriterOptions options, arrow::MemoryPool* pool) { if (options.partitionWriterType == PartitionWriterType::kLocal) { + std::stringstream fileName; + fileName << options.dataFile; + if (options.part >= 0) { + fileName << ".part_" << options.part; + } return std::make_unique( options.numPartitions, options, pool, - options.dataFile, + fileName.str(), options.configuredDirs); } else if (options.partitionWriterType == PartitionWriterType::kCeleborn) { return std::make_unique( diff --git a/bolt/shuffle/sparksql/partition_writer/PartitionWriter.h b/bolt/shuffle/sparksql/partition_writer/PartitionWriter.h index 00207d20d..f1155e596 100644 --- a/bolt/shuffle/sparksql/partition_writer/PartitionWriter.h +++ b/bolt/shuffle/sparksql/partition_writer/PartitionWriter.h @@ -71,6 +71,8 @@ class PartitionWriter { virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0; + virtual arrow::Status populateMetrics(ShuffleWriterMetrics* metrics) = 0; + /// Evict buffers for `partitionId` partition. virtual arrow::Status evict( uint32_t partitionId, diff --git a/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.cpp b/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.cpp index 34d90423e..908477049 100644 --- a/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.cpp +++ b/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.cpp @@ -43,10 +43,15 @@ void CelebornPartitionWriter::init() { } arrow::Status CelebornPartitionWriter::stop(ShuffleWriterMetrics* metrics) { + celebornClient_->stop(); + return populateMetrics(metrics); +} + +arrow::Status CelebornPartitionWriter::populateMetrics( + ShuffleWriterMetrics* metrics) { // Push data and collect metrics. auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), bytesEvicted_.end(), 0LL); - celebornClient_->stop(); // Populate metrics. metrics->totalCompressTime += compressTime_; metrics->totalEvictTime += spillTime_; diff --git a/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.h b/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.h index ff9c4c668..ba575393b 100644 --- a/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.h +++ b/bolt/shuffle/sparksql/partition_writer/rss/CelebornPartitionWriter.h @@ -61,6 +61,8 @@ class CelebornPartitionWriter final : public RemotePartitionWriter { arrow::Status stop(ShuffleWriterMetrics* metrics) override; + arrow::Status populateMetrics(ShuffleWriterMetrics* metrics) override; + // for BoltRowBasedSortShuffleWriter arrow::Status evict( std::vector>& rows, diff --git a/bolt/shuffle/sparksql/tests/ShuffleMiscTest.cpp b/bolt/shuffle/sparksql/tests/ShuffleMiscTest.cpp index c1765712f..8eb806e23 100644 --- a/bolt/shuffle/sparksql/tests/ShuffleMiscTest.cpp +++ b/bolt/shuffle/sparksql/tests/ShuffleMiscTest.cpp @@ -14,6 +14,16 @@ * limitations under the License. */ +#include +#include +#include + +#include +#include +#include + +#include "bolt/exec/tests/utils/TempDirectoryPath.h" +#include "bolt/shuffle/sparksql/partition_writer/LocalPartitionWriter.h" #include "bolt/shuffle/sparksql/tests/ShuffleTestBase.h" namespace bytedance::bolt::shuffle::sparksql::test { @@ -152,4 +162,49 @@ TEST_F(ShuffleMiscTest, SkewedDictionaryStringEstimateFlatSize) { executeTestWithCustomInput(param, inputData); } +TEST_F(ShuffleMiscTest, MergeTruncatesExistingTargetFile) { + auto tempDir = exec::test::TempDirectoryPath::create(); + const auto file1 = tempDir->path + "/shuffle_data_0.bin"; + const auto file2 = tempDir->path + "/shuffle_data_1.bin"; + const auto targetFile = tempDir->path + "/merged.bin"; + + auto writeFile = [](const std::string& path, const std::string& data) { + auto outputStreamResult = arrow::io::FileOutputStream::Open(path, false); + ASSERT_TRUE(outputStreamResult.ok()) + << outputStreamResult.status().ToString(); + auto outputStream = outputStreamResult.ValueOrDie(); + + auto buffer = arrow::Buffer::FromString(data); + + ASSERT_TRUE(outputStream->Write(buffer).ok()); + ASSERT_TRUE(outputStream->Close().ok()); + }; + + writeFile(file1, "ac"); + writeFile(file2, "bd"); + writeFile(targetFile, "stale-bytes"); + + const auto status = + LocalPartitionWriter::merge({file1, file2}, {{1, 1}, {1, 1}}, targetFile); + ASSERT_TRUE(status.ok()) << status.ToString(); + + auto mergedInputResult = arrow::io::ReadableFile::Open(targetFile); + ASSERT_TRUE(mergedInputResult.ok()) << mergedInputResult.status().ToString(); + auto mergedInput = mergedInputResult.ValueOrDie(); + + auto sizeResult = mergedInput->GetSize(); + ASSERT_TRUE(sizeResult.ok()) << sizeResult.status().ToString(); + auto mergedSize = sizeResult.ValueOrDie(); + EXPECT_EQ(mergedSize, 4); + + auto readResult = mergedInput->Read(mergedSize); + ASSERT_TRUE(readResult.ok()) << readResult.status().ToString(); + auto mergedBuffer = readResult.ValueOrDie(); + ASSERT_TRUE(mergedInput->Close().ok()); + + EXPECT_EQ(mergedBuffer->ToString(), "abcd"); + EXPECT_FALSE(std::filesystem::exists(file1)); + EXPECT_FALSE(std::filesystem::exists(file2)); +} + } // namespace bytedance::bolt::shuffle::sparksql::test